aboutsummaryrefslogtreecommitdiffstats
path: root/crates/configuration/src/state/routing
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-12 05:13:32 +0200
committerrtkay123 <dev@kanjala.com>2025-08-12 05:13:32 +0200
commitc5ea875f544824b0c042bf7c0a58b3134f9c0373 (patch)
tree4913d4ff2b408c7157e33894e40deec570ecce9e /crates/configuration/src/state/routing
parent9c850d6c4d0ed468709c2eb5340d7b64bbb9aa68 (diff)
downloadwarden-c5ea875f544824b0c042bf7c0a58b3134f9c0373.tar.bz2
warden-c5ea875f544824b0c042bf7c0a58b3134f9c0373.zip
feat(config): get active routing
Diffstat (limited to 'crates/configuration/src/state/routing')
-rw-r--r--crates/configuration/src/state/routing/query.rs106
1 files changed, 106 insertions, 0 deletions
diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query.rs
new file mode 100644
index 0000000..3c6814d
--- /dev/null
+++ b/crates/configuration/src/state/routing/query.rs
@@ -0,0 +1,106 @@
+use opentelemetry_semantic_conventions::attribute;
+use prost::Message;
+use tonic::{Request, Status};
+use tracing::{Instrument, debug, info_span, instrument, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_stack::redis::AsyncCommands;
+
+use uuid::Uuid;
+use warden_core::{
+ configuration::routing::{
+ GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting,
+ },
+ google,
+};
+
+use crate::state::{AppHandle, cache_key::CacheKey};
+
+pub struct RoutingRow {
+ id: Uuid,
+ configuration: sqlx::types::Json<RoutingConfiguration>,
+}
+
+#[tonic::async_trait]
+impl QueryRouting for AppHandle {
+ #[instrument(skip(self, _request), Err(Debug))]
+ async fn get_active_routing_configuration(
+ &self,
+ _request: Request<google::protobuf::Empty>,
+ ) -> Result<tonic::Response<GetActiveRoutingResponse>, Status> {
+ let mut cache = self
+ .services
+ .cache
+ .get()
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let span = info_span!("cache.get");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "get");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
+ let routing_config = cache
+ .get::<_, Vec<u8>>(CacheKey::ActiveRouting)
+ .instrument(span)
+ .await
+ .map(|value| {
+ if !value.is_empty() {
+ RoutingConfiguration::decode(value.as_ref()).ok()
+ } else {
+ None
+ }
+ });
+
+ if let Ok(Some(routing_config)) = routing_config {
+ if routing_config.active {
+ return Ok(tonic::Response::new(GetActiveRoutingResponse {
+ configuration: Some(routing_config),
+ }));
+ }
+ }
+
+ let span = info_span!("db.get.routing.active");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "select");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
+
+ let config = sqlx::query_as!(
+ RoutingRow,
+ r#"select id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>" from routing where
+ configuration->>'active' = 'true'"#,
+ )
+ .fetch_optional(&self.services.postgres)
+ .instrument(span)
+ .await.map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let config = config.map(|transaction| {
+ debug!(id = ?transaction.id, "found active config");
+ transaction.configuration.0
+ });
+
+ match config {
+ Some(config) => {
+ let bytes = config.encode_to_vec();
+ let span = info_span!("cache.set");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "set");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active");
+
+ if let Err(e) = cache
+ .set::<_, _, ()>(CacheKey::ActiveRouting, bytes)
+ .instrument(span)
+ .await
+ {
+ warn!("{e}");
+ };
+
+ Ok(tonic::Response::new(GetActiveRoutingResponse {
+ configuration: Some(config),
+ }))
+ }
+ None => Ok(tonic::Response::new(GetActiveRoutingResponse {
+ configuration: None,
+ })),
+ }
+ }
+}