From 1d347dd2142a266552812ac2f8844acf52d2dc1c Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Tue, 12 Aug 2025 14:00:28 +0200 Subject: feat(config): reload config --- .../src/state/routing/mutate_routing.rs | 150 +++++++++++++++++++++ crates/configuration/src/state/routing/query.rs | 106 --------------- .../src/state/routing/query_routing.rs | 106 +++++++++++++++ 3 files changed, 256 insertions(+), 106 deletions(-) create mode 100644 crates/configuration/src/state/routing/mutate_routing.rs delete mode 100644 crates/configuration/src/state/routing/query.rs create mode 100644 crates/configuration/src/state/routing/query_routing.rs (limited to 'crates/configuration/src/state/routing') diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs new file mode 100644 index 0000000..0c0637a --- /dev/null +++ b/crates/configuration/src/state/routing/mutate_routing.rs @@ -0,0 +1,150 @@ +use opentelemetry_semantic_conventions::attribute; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{Instrument, error, info_span, instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +use warden_core::configuration::{ + ReloadEvent, + routing::{ + DeleteConfigurationRequest, RoutingConfiguration, UpdateRoutingRequest, + mutate_routing_server::MutateRouting, + }, +}; + +use crate::state::{AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload}; + +#[allow(dead_code)] +struct RoutingRow { + id: Uuid, + configuration: sqlx::types::Json, +} + +#[async_trait] +impl MutateRouting for AppHandle { + #[instrument(skip(self, request))] + async fn create_routing_configuration( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let span = info_span!("create.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + sqlx::query!( + "insert into routing (id, configuration) values ($1, $2)", + Uuid::now_v7(), + sqlx::types::Json(&request) as _ + ) + .execute(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + Ok(tonic::Response::new(request)) + } + + async fn update_routing_configuration( + &self, + request: Request, + ) -> Result, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + let id = Uuid::parse_str(&request.id) + .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?; + + let config = request.configuration.expect("configuration to be provided"); + + let span = info_span!("update.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "update"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + let updated = sqlx::query_as!( + RoutingRow, + r#" + update routing + set configuration = $1 + where id = $2 + returning id, configuration as "configuration: sqlx::types::Json" + "#, + sqlx::types::Json(&config) as _, + id + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache(self, CacheKey::Routing(&id)), + publish_reload(self, conf, ReloadEvent::Routing) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } + + async fn delete_routing_configuration( + &self, + request: Request, + ) -> Result, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + let id = Uuid::parse_str(&request.id) + .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?; + + let span = info_span!("delete.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "delete"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + let updated = sqlx::query_as!( + RoutingRow, + r#" + delete from routing + where id = $1 + returning id, configuration as "configuration: sqlx::types::Json" + "#, + id + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache(self, CacheKey::Routing(&id)), + publish_reload(self, conf, ReloadEvent::Routing) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } +} diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query.rs deleted file mode 100644 index 3c6814d..0000000 --- a/crates/configuration/src/state/routing/query.rs +++ /dev/null @@ -1,106 +0,0 @@ -use opentelemetry_semantic_conventions::attribute; -use prost::Message; -use tonic::{Request, Status}; -use tracing::{Instrument, debug, info_span, instrument, warn}; -use tracing_opentelemetry::OpenTelemetrySpanExt; -use warden_stack::redis::AsyncCommands; - -use uuid::Uuid; -use warden_core::{ - configuration::routing::{ - GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting, - }, - google, -}; - -use crate::state::{AppHandle, cache_key::CacheKey}; - -pub struct RoutingRow { - id: Uuid, - configuration: sqlx::types::Json, -} - -#[tonic::async_trait] -impl QueryRouting for AppHandle { - #[instrument(skip(self, _request), Err(Debug))] - async fn get_active_routing_configuration( - &self, - _request: Request, - ) -> Result, Status> { - let mut cache = self - .services - .cache - .get() - .await - .map_err(|e| tonic::Status::internal(e.to_string()))?; - - let span = info_span!("cache.get"); - span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); - span.set_attribute(attribute::DB_OPERATION_NAME, "get"); - span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); - let routing_config = cache - .get::<_, Vec>(CacheKey::ActiveRouting) - .instrument(span) - .await - .map(|value| { - if !value.is_empty() { - RoutingConfiguration::decode(value.as_ref()).ok() - } else { - None - } - }); - - if let Ok(Some(routing_config)) = routing_config { - if routing_config.active { - return Ok(tonic::Response::new(GetActiveRoutingResponse { - configuration: Some(routing_config), - })); - } - } - - let span = info_span!("db.get.routing.active"); - span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); - span.set_attribute(attribute::DB_OPERATION_NAME, "select"); - span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); - span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); - - let config = sqlx::query_as!( - RoutingRow, - r#"select id, configuration as "configuration: sqlx::types::Json" from routing where - configuration->>'active' = 'true'"#, - ) - .fetch_optional(&self.services.postgres) - .instrument(span) - .await.map_err(|e| tonic::Status::internal(e.to_string()))?; - - let config = config.map(|transaction| { - debug!(id = ?transaction.id, "found active config"); - transaction.configuration.0 - }); - - match config { - Some(config) => { - let bytes = config.encode_to_vec(); - let span = info_span!("cache.set"); - span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); - span.set_attribute(attribute::DB_OPERATION_NAME, "set"); - span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active"); - - if let Err(e) = cache - .set::<_, _, ()>(CacheKey::ActiveRouting, bytes) - .instrument(span) - .await - { - warn!("{e}"); - }; - - Ok(tonic::Response::new(GetActiveRoutingResponse { - configuration: Some(config), - })) - } - None => Ok(tonic::Response::new(GetActiveRoutingResponse { - configuration: None, - })), - } - } -} diff --git a/crates/configuration/src/state/routing/query_routing.rs b/crates/configuration/src/state/routing/query_routing.rs new file mode 100644 index 0000000..3c6814d --- /dev/null +++ b/crates/configuration/src/state/routing/query_routing.rs @@ -0,0 +1,106 @@ +use opentelemetry_semantic_conventions::attribute; +use prost::Message; +use tonic::{Request, Status}; +use tracing::{Instrument, debug, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_stack::redis::AsyncCommands; + +use uuid::Uuid; +use warden_core::{ + configuration::routing::{ + GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting, + }, + google, +}; + +use crate::state::{AppHandle, cache_key::CacheKey}; + +pub struct RoutingRow { + id: Uuid, + configuration: sqlx::types::Json, +} + +#[tonic::async_trait] +impl QueryRouting for AppHandle { + #[instrument(skip(self, _request), Err(Debug))] + async fn get_active_routing_configuration( + &self, + _request: Request, + ) -> Result, Status> { + let mut cache = self + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let span = info_span!("cache.get"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "get"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + let routing_config = cache + .get::<_, Vec>(CacheKey::ActiveRouting) + .instrument(span) + .await + .map(|value| { + if !value.is_empty() { + RoutingConfiguration::decode(value.as_ref()).ok() + } else { + None + } + }); + + if let Ok(Some(routing_config)) = routing_config { + if routing_config.active { + return Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(routing_config), + })); + } + } + + let span = info_span!("db.get.routing.active"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "select"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + + let config = sqlx::query_as!( + RoutingRow, + r#"select id, configuration as "configuration: sqlx::types::Json" from routing where + configuration->>'active' = 'true'"#, + ) + .fetch_optional(&self.services.postgres) + .instrument(span) + .await.map_err(|e| tonic::Status::internal(e.to_string()))?; + + let config = config.map(|transaction| { + debug!(id = ?transaction.id, "found active config"); + transaction.configuration.0 + }); + + match config { + Some(config) => { + let bytes = config.encode_to_vec(); + let span = info_span!("cache.set"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "set"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active"); + + if let Err(e) = cache + .set::<_, _, ()>(CacheKey::ActiveRouting, bytes) + .instrument(span) + .await + { + warn!("{e}"); + }; + + Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(config), + })) + } + None => Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: None, + })), + } + } +} -- cgit v1.2.3