From c5ea875f544824b0c042bf7c0a58b3134f9c0373 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Tue, 12 Aug 2025 05:13:32 +0200 Subject: feat(config): get active routing --- crates/configuration/src/state/routing/query.rs | 106 ++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 crates/configuration/src/state/routing/query.rs (limited to 'crates/configuration/src/state/routing') diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query.rs new file mode 100644 index 0000000..3c6814d --- /dev/null +++ b/crates/configuration/src/state/routing/query.rs @@ -0,0 +1,106 @@ +use opentelemetry_semantic_conventions::attribute; +use prost::Message; +use tonic::{Request, Status}; +use tracing::{Instrument, debug, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_stack::redis::AsyncCommands; + +use uuid::Uuid; +use warden_core::{ + configuration::routing::{ + GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting, + }, + google, +}; + +use crate::state::{AppHandle, cache_key::CacheKey}; + +pub struct RoutingRow { + id: Uuid, + configuration: sqlx::types::Json, +} + +#[tonic::async_trait] +impl QueryRouting for AppHandle { + #[instrument(skip(self, _request), Err(Debug))] + async fn get_active_routing_configuration( + &self, + _request: Request, + ) -> Result, Status> { + let mut cache = self + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let span = info_span!("cache.get"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "get"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + let routing_config = cache + .get::<_, Vec>(CacheKey::ActiveRouting) + .instrument(span) + .await + .map(|value| { + if !value.is_empty() { + RoutingConfiguration::decode(value.as_ref()).ok() + } else { + None + } + }); + + if let Ok(Some(routing_config)) = routing_config { + if routing_config.active { + return Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(routing_config), + })); + } + } + + let span = info_span!("db.get.routing.active"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "select"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + + let config = sqlx::query_as!( + RoutingRow, + r#"select id, configuration as "configuration: sqlx::types::Json" from routing where + configuration->>'active' = 'true'"#, + ) + .fetch_optional(&self.services.postgres) + .instrument(span) + .await.map_err(|e| tonic::Status::internal(e.to_string()))?; + + let config = config.map(|transaction| { + debug!(id = ?transaction.id, "found active config"); + transaction.configuration.0 + }); + + match config { + Some(config) => { + let bytes = config.encode_to_vec(); + let span = info_span!("cache.set"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "set"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active"); + + if let Err(e) = cache + .set::<_, _, ()>(CacheKey::ActiveRouting, bytes) + .instrument(span) + .await + { + warn!("{e}"); + }; + + Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(config), + })) + } + None => Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: None, + })), + } + } +} -- cgit v1.2.3