From 1d347dd2142a266552812ac2f8844acf52d2dc1c Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Tue, 12 Aug 2025 14:00:28 +0200 Subject: feat(config): reload config --- crates/configuration/src/main.rs | 54 ++++++-- crates/configuration/src/server.rs | 46 +++---- crates/configuration/src/server/grpc_svc.rs | 24 ++++ crates/configuration/src/server/http_svc.rs | 2 +- crates/configuration/src/server/http_svc/routes.rs | 2 +- .../server/http_svc/routes/routing/get_active.rs | 14 +- crates/configuration/src/server/interceptor.rs | 23 ---- crates/configuration/src/server/reload_stream.rs | 21 +++ crates/configuration/src/state.rs | 44 +++++- crates/configuration/src/state/routing.rs | 3 +- .../src/state/routing/mutate_routing.rs | 150 +++++++++++++++++++++ crates/configuration/src/state/routing/query.rs | 106 --------------- .../src/state/routing/query_routing.rs | 106 +++++++++++++++ 13 files changed, 419 insertions(+), 176 deletions(-) create mode 100644 crates/configuration/src/server/grpc_svc.rs delete mode 100644 crates/configuration/src/server/interceptor.rs create mode 100644 crates/configuration/src/server/reload_stream.rs create mode 100644 crates/configuration/src/state/routing/mutate_routing.rs delete mode 100644 crates/configuration/src/state/routing/query.rs create mode 100644 crates/configuration/src/state/routing/query_routing.rs (limited to 'crates/configuration/src') diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs index 7eb5e3b..7dc8da6 100644 --- a/crates/configuration/src/main.rs +++ b/crates/configuration/src/main.rs @@ -2,9 +2,13 @@ mod cnfg; mod server; mod state; +use std::net::{Ipv6Addr, SocketAddr}; + +use crate::{server::error::AppError, state::AppState}; +use axum::http::header::CONTENT_TYPE; use clap::Parser; -use tracing::error; -use warden_config::state::AppState; +use tower::{make::Shared, steer::Steer}; +use tracing::{error, info}; use warden_stack::{Configuration, Services, tracing::Tracing}; /// warden-config @@ -17,7 +21,7 @@ struct Args { } #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<(), AppError> { let args = Args::parse(); let config = include_str!("../warden-config.toml"); @@ -41,7 +45,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(tracing.loki_task); - let services = Services::builder() + let mut services = Services::builder() .postgres(&config.database) .await .inspect_err(|e| error!("database: {e}"))? @@ -68,9 +72,43 @@ async fn main() -> anyhow::Result<()> { .take() .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; - let state = AppState::new(services, config, Some(provider)) - .await - .inspect_err(|e| error!("{e}"))?; + let state = AppState::create( + crate::state::Services { + postgres, + cache, + jetstream, + }, + &config, + ) + .await?; + + let (app, grpc_server) = server::serve(state)?; + + let service = Steer::new( + vec![app, grpc_server], + |req: &axum::extract::Request, _services: &[_]| { + if req + .headers() + .get(CONTENT_TYPE) + .map(|content_type| content_type.as_bytes()) + .filter(|content_type| content_type.starts_with(b"application/grpc")) + .is_some() + { + // grpc service + 1 + } else { + // http service + 0 + } + }, + ); + + let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port)); + + let listener = tokio::net::TcpListener::bind(addr).await?; + info!(port = addr.port(), "starting config-api"); + + axum::serve(listener, Shared::new(service)).await?; - server::serve(state); + Ok(()) } diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs index 1131144..dec0813 100644 --- a/crates/configuration/src/server.rs +++ b/crates/configuration/src/server.rs @@ -1,13 +1,13 @@ -mod error; +pub mod error; +pub mod grpc_svc; mod http_svc; -mod interceptor; +pub mod reload_stream; mod version; -use axum::http::header::CONTENT_TYPE; +use grpc_svc::interceptor::MyInterceptor; use http_svc::build_router; -use interceptor::MyInterceptor; use tonic::service::Routes; -use tower::{make::Shared, steer::Steer}; +use tower_http::trace::TraceLayer; use warden_core::{ FILE_DESCRIPTOR_SET, configuration::routing::{ @@ -15,9 +15,9 @@ use warden_core::{ }, }; -use crate::state::AppHandle; +use crate::{server::error::AppError, state::AppHandle}; -pub async fn serve(state: AppHandle) -> anyhow::Result> { +pub fn serve(state: AppHandle) -> Result<(axum::Router, axum::Router), AppError> { let app = build_router(state.clone()); let service = QueryRoutingServer::with_interceptor(state.clone(), MyInterceptor); @@ -32,26 +32,14 @@ pub async fn serve(state: AppHandle) -> anyhow::Result> { MyInterceptor, )) .add_service(routing_reflector) - .into_axum_router(); - - let service = Steer::new( - vec![app, grpc_server], - |req: &axum::extract::Request, _services: &[_]| { - if req - .headers() - .get(CONTENT_TYPE) - .map(|content_type| content_type.as_bytes()) - .filter(|content_type| content_type.starts_with(b"application/grpc")) - .is_some() - { - // grpc service - 1 - } else { - // http service - 0 - } - }, - ); - - Ok(Shared::new(service)) + .into_axum_router() + .layer( + TraceLayer::new_for_grpc().make_span_with(|request: &axum::http::Request<_>| { + tracing::trace_span!(env!("CARGO_PKG_NAME"), "otel.kind" = "server", + headers = ?request.headers() + ) + }), + ); + + Ok((app, grpc_server)) } diff --git a/crates/configuration/src/server/grpc_svc.rs b/crates/configuration/src/server/grpc_svc.rs new file mode 100644 index 0000000..42aa871 --- /dev/null +++ b/crates/configuration/src/server/grpc_svc.rs @@ -0,0 +1,24 @@ +pub mod interceptor { + use opentelemetry::global; + use tonic::{Status, service::Interceptor}; + use tracing::Span; + use tracing_opentelemetry::OpenTelemetrySpanExt; + use warden_stack::tracing::telemetry::tonic::extractor; + + #[derive(Clone, Copy)] + pub struct MyInterceptor; + + impl Interceptor for MyInterceptor { + fn call(&mut self, request: tonic::Request<()>) -> Result, Status> { + let span = Span::current(); + + let cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::MetadataMap(request.metadata())) + }); + + span.set_parent(cx); + + Ok(request) + } + } +} diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs index 7b2a258..45cdcfa 100644 --- a/crates/configuration/src/server/http_svc.rs +++ b/crates/configuration/src/server/http_svc.rs @@ -8,7 +8,7 @@ use utoipa_redoc::Servable; #[cfg(feature = "scalar")] use utoipa_scalar::Servable as _; -use crate::{server::http_svc, state::AppHandle}; +use crate::state::AppHandle; const TAG_ROUTING: &str = "Routing"; diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs index cc065e8..e70dccd 100644 --- a/crates/configuration/src/server/http_svc/routes.rs +++ b/crates/configuration/src/server/http_svc/routes.rs @@ -6,6 +6,6 @@ use crate::state::AppHandle; pub fn router(store: AppHandle) -> OpenApiRouter { OpenApiRouter::new() - .routes(routes!(routing::get_active)) + .routes(routes!(routing::get_active::active_routing)) .with_state(store) } diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs index 562a1f1..9dc22e3 100644 --- a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs +++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs @@ -1,5 +1,9 @@ use axum::{extract::State, response::IntoResponse}; -use warden_core::configuration::routing::RoutingConfiguration; +use tonic::IntoRequest; +use warden_core::{ + configuration::routing::{RoutingConfiguration, query_routing_server::QueryRouting}, + google, +}; use crate::{ server::{error::AppError, http_svc::TAG_ROUTING, version::Version}, @@ -22,9 +26,13 @@ use crate::{ ] #[axum::debug_handler] #[tracing::instrument(skip(state), err(Debug), fields(method = "GET"))] -pub(super) async fn active_routing( +pub async fn active_routing( version: Version, State(state): State, ) -> Result { - Ok(String::default().into_response()) + let config = state + .get_active_routing_configuration(google::protobuf::Empty::default().into_request()) + .await? + .into_inner(); + Ok(axum::Json(config.configuration).into_response()) } diff --git a/crates/configuration/src/server/interceptor.rs b/crates/configuration/src/server/interceptor.rs deleted file mode 100644 index eeb36c2..0000000 --- a/crates/configuration/src/server/interceptor.rs +++ /dev/null @@ -1,23 +0,0 @@ -use tonic::{Status, service::Interceptor}; -use tracing::Span; -use warden_stack::{ - opentelemetry::global, tracing::telemetry::tonic::extractor, - tracing_opentelemetry::OpenTelemetrySpanExt, -}; - -#[derive(Clone, Copy)] -pub struct MyInterceptor; - -impl Interceptor for MyInterceptor { - fn call(&mut self, request: tonic::Request<()>) -> Result, Status> { - let span = Span::current(); - - let cx = global::get_text_map_propagator(|propagator| { - propagator.extract(&extractor::MetadataMap(request.metadata())) - }); - - span.set_parent(cx); - - Ok(request) - } -} diff --git a/crates/configuration/src/server/reload_stream.rs b/crates/configuration/src/server/reload_stream.rs new file mode 100644 index 0000000..d2ee4ab --- /dev/null +++ b/crates/configuration/src/server/reload_stream.rs @@ -0,0 +1,21 @@ +use async_nats::jetstream::Context; +use tracing::{debug, info}; + +use crate::cnfg::JetstreamConfig; + +pub async fn create_stream(jetstream: &Context, config: &JetstreamConfig) -> anyhow::Result<()> { + debug!(name = ?config.stream, "initialising stream"); + + jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: config.stream.to_string(), + max_messages: config.max_messages, + subjects: vec![config.subject.to_string()], + ..Default::default() + }) + .await?; + + info!(name = ?config.stream, subject = ?config.subject, "stream is ready"); + + Ok(()) +} diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs index 7672891..5a51d5b 100644 --- a/crates/configuration/src/state.rs +++ b/crates/configuration/src/state.rs @@ -4,13 +4,14 @@ mod routing; use async_nats::jetstream::Context; use sqlx::PgPool; use std::{ops::Deref, sync::Arc}; -use tonic::transport::Endpoint; -use tracing::error; -use warden_stack::{Configuration, cache::RedisManager}; +use tracing::{instrument, trace}; +use warden_core::configuration::ReloadEvent; +use warden_stack::{Configuration, cache::RedisManager, redis::AsyncCommands}; use crate::{ cnfg::LocalConfig, - server::grpc::interceptor::{Intercepted, MyInterceptor}, + server::{error::AppError, reload_stream::create_stream}, + state::cache_key::CacheKey, }; #[derive(Clone)] @@ -43,9 +44,44 @@ impl AppState { ) -> Result { let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + create_stream(&services.jetstream, &local_config.nats).await?; + Ok(AppHandle(Arc::new(Self { services, app_config: local_config, }))) } } + +#[instrument(skip(state), err(Debug))] +pub async fn invalidate_cache(state: &AppHandle, key: CacheKey<'_>) -> Result<(), tonic::Status> { + trace!("invalidating cache"); + let mut cache = state + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + cache + .del::<_, ()>(key) + .await + .map_err(|e| tonic::Status::internal(e.to_string())) +} + +#[instrument(skip(state), err(Debug))] +pub async fn publish_reload( + state: &AppHandle, + prefix: &str, + event: ReloadEvent, +) -> Result<(), tonic::Status> { + trace!("publishing reload event"); + state + .services + .jetstream + .publish(format!("{prefix}.reload"), event.as_str_name().into()) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + Ok(()) +} diff --git a/crates/configuration/src/state/routing.rs b/crates/configuration/src/state/routing.rs index ea51c17..d71af04 100644 --- a/crates/configuration/src/state/routing.rs +++ b/crates/configuration/src/state/routing.rs @@ -1 +1,2 @@ -mod query; +mod mutate_routing; +mod query_routing; diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs new file mode 100644 index 0000000..0c0637a --- /dev/null +++ b/crates/configuration/src/state/routing/mutate_routing.rs @@ -0,0 +1,150 @@ +use opentelemetry_semantic_conventions::attribute; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{Instrument, error, info_span, instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +use warden_core::configuration::{ + ReloadEvent, + routing::{ + DeleteConfigurationRequest, RoutingConfiguration, UpdateRoutingRequest, + mutate_routing_server::MutateRouting, + }, +}; + +use crate::state::{AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload}; + +#[allow(dead_code)] +struct RoutingRow { + id: Uuid, + configuration: sqlx::types::Json, +} + +#[async_trait] +impl MutateRouting for AppHandle { + #[instrument(skip(self, request))] + async fn create_routing_configuration( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let span = info_span!("create.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + sqlx::query!( + "insert into routing (id, configuration) values ($1, $2)", + Uuid::now_v7(), + sqlx::types::Json(&request) as _ + ) + .execute(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + Ok(tonic::Response::new(request)) + } + + async fn update_routing_configuration( + &self, + request: Request, + ) -> Result, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + let id = Uuid::parse_str(&request.id) + .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?; + + let config = request.configuration.expect("configuration to be provided"); + + let span = info_span!("update.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "update"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + let updated = sqlx::query_as!( + RoutingRow, + r#" + update routing + set configuration = $1 + where id = $2 + returning id, configuration as "configuration: sqlx::types::Json" + "#, + sqlx::types::Json(&config) as _, + id + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache(self, CacheKey::Routing(&id)), + publish_reload(self, conf, ReloadEvent::Routing) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } + + async fn delete_routing_configuration( + &self, + request: Request, + ) -> Result, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + let id = Uuid::parse_str(&request.id) + .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?; + + let span = info_span!("delete.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "delete"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + let updated = sqlx::query_as!( + RoutingRow, + r#" + delete from routing + where id = $1 + returning id, configuration as "configuration: sqlx::types::Json" + "#, + id + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache(self, CacheKey::Routing(&id)), + publish_reload(self, conf, ReloadEvent::Routing) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } +} diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query.rs deleted file mode 100644 index 3c6814d..0000000 --- a/crates/configuration/src/state/routing/query.rs +++ /dev/null @@ -1,106 +0,0 @@ -use opentelemetry_semantic_conventions::attribute; -use prost::Message; -use tonic::{Request, Status}; -use tracing::{Instrument, debug, info_span, instrument, warn}; -use tracing_opentelemetry::OpenTelemetrySpanExt; -use warden_stack::redis::AsyncCommands; - -use uuid::Uuid; -use warden_core::{ - configuration::routing::{ - GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting, - }, - google, -}; - -use crate::state::{AppHandle, cache_key::CacheKey}; - -pub struct RoutingRow { - id: Uuid, - configuration: sqlx::types::Json, -} - -#[tonic::async_trait] -impl QueryRouting for AppHandle { - #[instrument(skip(self, _request), Err(Debug))] - async fn get_active_routing_configuration( - &self, - _request: Request, - ) -> Result, Status> { - let mut cache = self - .services - .cache - .get() - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?; - - let span = info_span!("cache.get"); - span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); - span.set_attribute(attribute::DB_OPERATION_NAME, "get"); - span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); - let routing_config = cache - .get::<_, Vec>(CacheKey::ActiveRouting) - .instrument(span) - .await - .map(|value| { - if !value.is_empty() { - RoutingConfiguration::decode(value.as_ref()).ok() - } else { - None - } - }); - - if let Ok(Some(routing_config)) = routing_config { - if routing_config.active { - return Ok(tonic::Response::new(GetActiveRoutingResponse { - configuration: Some(routing_config), - })); - } - } - - let span = info_span!("db.get.routing.active"); - span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); - span.set_attribute(attribute::DB_OPERATION_NAME, "select"); - span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); - span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); - - let config = sqlx::query_as!( - RoutingRow, - r#"select id, configuration as "configuration: sqlx::types::Json" from routing where - configuration->>'active' = 'true'"#, - ) - .fetch_optional(&self.services.postgres) - .instrument(span) - .await.map_err(|e| tonic::Status::internal(e.to_string()))?; - - let config = config.map(|transaction| { - debug!(id = ?transaction.id, "found active config"); - transaction.configuration.0 - }); - - match config { - Some(config) => { - let bytes = config.encode_to_vec(); - let span = info_span!("cache.set"); - span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); - span.set_attribute(attribute::DB_OPERATION_NAME, "set"); - span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active"); - - if let Err(e) = cache - .set::<_, _, ()>(CacheKey::ActiveRouting, bytes) - .instrument(span) - .await - { - warn!("{e}"); - }; - - Ok(tonic::Response::new(GetActiveRoutingResponse { - configuration: Some(config), - })) - } - None => Ok(tonic::Response::new(GetActiveRoutingResponse { - configuration: None, - })), - } - } -} diff --git a/crates/configuration/src/state/routing/query_routing.rs b/crates/configuration/src/state/routing/query_routing.rs new file mode 100644 index 0000000..3c6814d --- /dev/null +++ b/crates/configuration/src/state/routing/query_routing.rs @@ -0,0 +1,106 @@ +use opentelemetry_semantic_conventions::attribute; +use prost::Message; +use tonic::{Request, Status}; +use tracing::{Instrument, debug, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_stack::redis::AsyncCommands; + +use uuid::Uuid; +use warden_core::{ + configuration::routing::{ + GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting, + }, + google, +}; + +use crate::state::{AppHandle, cache_key::CacheKey}; + +pub struct RoutingRow { + id: Uuid, + configuration: sqlx::types::Json, +} + +#[tonic::async_trait] +impl QueryRouting for AppHandle { + #[instrument(skip(self, _request), Err(Debug))] + async fn get_active_routing_configuration( + &self, + _request: Request, + ) -> Result, Status> { + let mut cache = self + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let span = info_span!("cache.get"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "get"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + let routing_config = cache + .get::<_, Vec>(CacheKey::ActiveRouting) + .instrument(span) + .await + .map(|value| { + if !value.is_empty() { + RoutingConfiguration::decode(value.as_ref()).ok() + } else { + None + } + }); + + if let Ok(Some(routing_config)) = routing_config { + if routing_config.active { + return Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(routing_config), + })); + } + } + + let span = info_span!("db.get.routing.active"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "select"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + + let config = sqlx::query_as!( + RoutingRow, + r#"select id, configuration as "configuration: sqlx::types::Json" from routing where + configuration->>'active' = 'true'"#, + ) + .fetch_optional(&self.services.postgres) + .instrument(span) + .await.map_err(|e| tonic::Status::internal(e.to_string()))?; + + let config = config.map(|transaction| { + debug!(id = ?transaction.id, "found active config"); + transaction.configuration.0 + }); + + match config { + Some(config) => { + let bytes = config.encode_to_vec(); + let span = info_span!("cache.set"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "set"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active"); + + if let Err(e) = cache + .set::<_, _, ()>(CacheKey::ActiveRouting, bytes) + .instrument(span) + .await + { + warn!("{e}"); + }; + + Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(config), + })) + } + None => Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: None, + })), + } + } +} -- cgit v1.2.3