From 1b8c6886f6d22f9c61e978b42d066dce91e334dc Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Thu, 14 Aug 2025 18:05:07 +0200 Subject: feat(config): rule grpc --- crates/configuration/src/state/cache_key.rs | 2 + .../src/state/routing/mutate_routing.rs | 23 ++- .../src/state/routing/query_routing.rs | 16 ++- crates/configuration/src/state/rule.rs | 24 ++++ crates/configuration/src/state/rule/mutate_rule.rs | 154 +++++++++++++++++++++ crates/configuration/src/state/rule/query_rule.rs | 97 +++++++++++++ 6 files changed, 304 insertions(+), 12 deletions(-) create mode 100644 crates/configuration/src/state/rule.rs create mode 100644 crates/configuration/src/state/rule/mutate_rule.rs create mode 100644 crates/configuration/src/state/rule/query_rule.rs (limited to 'crates/configuration/src/state') diff --git a/crates/configuration/src/state/cache_key.rs b/crates/configuration/src/state/cache_key.rs index a63b15d..a99700e 100644 --- a/crates/configuration/src/state/cache_key.rs +++ b/crates/configuration/src/state/cache_key.rs @@ -4,6 +4,7 @@ use warden_stack::redis::ToRedisArgs; pub enum CacheKey<'a> { ActiveRouting, Routing(&'a uuid::Uuid), + Rule { id: &'a str, version: &'a str }, } impl ToRedisArgs for CacheKey<'_> { @@ -14,6 +15,7 @@ impl ToRedisArgs for CacheKey<'_> { let value = match self { CacheKey::ActiveRouting => "routing.active".into(), CacheKey::Routing(uuid) => format!("routing.{uuid}"), + CacheKey::Rule { id, version } => format!("rule.{id}.{version}"), }; out.write_arg(value.as_bytes()); diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs index 0c0637a..105cf18 100644 --- a/crates/configuration/src/state/routing/mutate_routing.rs +++ b/crates/configuration/src/state/routing/mutate_routing.rs @@ -1,6 +1,6 @@ use opentelemetry_semantic_conventions::attribute; use tonic::{Request, Response, Status, async_trait}; -use tracing::{Instrument, error, info_span, instrument}; +use tracing::{Instrument, error, info_span, instrument, trace}; use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; use warden_core::configuration::{ @@ -26,11 +26,14 @@ impl MutateRouting for AppHandle { &self, request: Request, ) -> Result, Status> { + trace!("creating routing configuration"); + let request = request.into_inner(); let span = info_span!("create.configuration.routing"); span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + span.set_attribute("otel.kind", "client"); sqlx::query!( "insert into routing (id, configuration) values ($1, $2)", @@ -42,8 +45,9 @@ impl MutateRouting for AppHandle { .await .map_err(|e| { error!("{e}"); - tonic::Status::internal(e.to_string()) + tonic::Status::internal("database error") })?; + trace!("configuration created"); Ok(tonic::Response::new(request)) } @@ -58,7 +62,7 @@ impl MutateRouting for AppHandle { .subject .split(".") .next() - .expect("bad config"); + .expect("checked on startup"); let request = request.into_inner(); let id = Uuid::parse_str(&request.id) @@ -70,6 +74,9 @@ impl MutateRouting for AppHandle { span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); span.set_attribute(attribute::DB_OPERATION_NAME, "update"); span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + span.set_attribute("otel.kind", "client"); + + trace!("updating configuration"); let updated = sqlx::query_as!( RoutingRow, @@ -87,8 +94,9 @@ impl MutateRouting for AppHandle { .await .map_err(|e| { error!("{e}"); - tonic::Status::internal(e.to_string()) + tonic::Status::internal("database is not ready") })?; + trace!("configuration updated"); let (_del_result, _publish_result) = tokio::try_join!( invalidate_cache(self, CacheKey::Routing(&id)), @@ -110,7 +118,7 @@ impl MutateRouting for AppHandle { .subject .split(".") .next() - .expect("bad config"); + .expect("checked on startup"); let request = request.into_inner(); let id = Uuid::parse_str(&request.id) @@ -120,6 +128,8 @@ impl MutateRouting for AppHandle { span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); span.set_attribute(attribute::DB_OPERATION_NAME, "delete"); span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + span.set_attribute("otel.kind", "client"); + trace!("deleting configuration"); let updated = sqlx::query_as!( RoutingRow, @@ -135,8 +145,9 @@ impl MutateRouting for AppHandle { .await .map_err(|e| { error!("{e}"); - tonic::Status::internal(e.to_string()) + tonic::Status::internal("database is not ready") })?; + trace!("configuration deleted"); let (_del_result, _publish_result) = tokio::try_join!( invalidate_cache(self, CacheKey::Routing(&id)), diff --git a/crates/configuration/src/state/routing/query_routing.rs b/crates/configuration/src/state/routing/query_routing.rs index 3c6814d..82c8d1c 100644 --- a/crates/configuration/src/state/routing/query_routing.rs +++ b/crates/configuration/src/state/routing/query_routing.rs @@ -38,6 +38,8 @@ impl QueryRouting for AppHandle { span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); span.set_attribute(attribute::DB_OPERATION_NAME, "get"); span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + span.set_attribute("otel.kind", "client"); + let routing_config = cache .get::<_, Vec>(CacheKey::ActiveRouting) .instrument(span) @@ -50,12 +52,12 @@ impl QueryRouting for AppHandle { } }); - if let Ok(Some(routing_config)) = routing_config { - if routing_config.active { - return Ok(tonic::Response::new(GetActiveRoutingResponse { - configuration: Some(routing_config), - })); - } + if let Ok(Some(routing_config)) = routing_config + && routing_config.active + { + return Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(routing_config), + })); } let span = info_span!("db.get.routing.active"); @@ -63,6 +65,7 @@ impl QueryRouting for AppHandle { span.set_attribute(attribute::DB_OPERATION_NAME, "select"); span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + span.set_attribute("otel.kind", "client"); let config = sqlx::query_as!( RoutingRow, @@ -85,6 +88,7 @@ impl QueryRouting for AppHandle { span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); span.set_attribute(attribute::DB_OPERATION_NAME, "set"); span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active"); + span.set_attribute("otel.kind", "client"); if let Err(e) = cache .set::<_, _, ()>(CacheKey::ActiveRouting, bytes) diff --git a/crates/configuration/src/state/rule.rs b/crates/configuration/src/state/rule.rs new file mode 100644 index 0000000..4038865 --- /dev/null +++ b/crates/configuration/src/state/rule.rs @@ -0,0 +1,24 @@ +use uuid::Uuid; +use warden_core::configuration::rule::{RuleConfiguration, RuleConfigurationRequest}; + +use crate::state::cache_key::CacheKey; + +mod mutate_rule; +mod query_rule; + +#[allow(dead_code)] +pub struct RuleRow { + pub uuid: Uuid, + pub id: Option, + pub version: Option, + pub configuration: sqlx::types::Json, +} + +impl<'a> From<&'a RuleConfigurationRequest> for CacheKey<'a> { + fn from(value: &'a RuleConfigurationRequest) -> Self { + Self::Rule { + id: &value.id, + version: &value.version, + } + } +} diff --git a/crates/configuration/src/state/rule/mutate_rule.rs b/crates/configuration/src/state/rule/mutate_rule.rs new file mode 100644 index 0000000..7b853aa --- /dev/null +++ b/crates/configuration/src/state/rule/mutate_rule.rs @@ -0,0 +1,154 @@ +use opentelemetry_semantic_conventions::attribute; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{Instrument, error, info_span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +use warden_core::configuration::{ + ReloadEvent, + rule::{ + DeleteRuleConfigurationRequest, RuleConfiguration, UpdateRuleRequest, + mutate_rule_configuration_server::MutateRuleConfiguration, + }, +}; + +use crate::state::{ + AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload, rule::RuleRow, +}; + +#[async_trait] +impl MutateRuleConfiguration for AppHandle { + async fn create_rule_configuration( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let span = info_span!("create.configuration.rule"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "rule"); + span.set_attribute("otel.kind", "client"); + + sqlx::query!( + "insert into rule (uuid, configuration) values ($1, $2)", + Uuid::now_v7(), + sqlx::types::Json(&request) as _, + ) + .execute(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + Ok(tonic::Response::new(request)) + } + + async fn update_rule_configuration( + &self, + request: Request, + ) -> Result, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("checked on startup"); + + let request = request.into_inner(); + + let config = request.configuration.expect("configuration to be provided"); + + let span = info_span!("update.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "update"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "rule"); + span.set_attribute("otel.kind", "client"); + + sqlx::query!( + r#" + update rule + set configuration = $1 + where id = $2 and version = $3 + "#, + sqlx::types::Json(&config) as _, + config.id, + config.id, + ) + .execute(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache( + self, + CacheKey::Rule { + id: &config.id, + version: &config.version, + } + ), + publish_reload(self, conf, ReloadEvent::Rule) + )?; + + Ok(Response::new(config)) + } + + async fn delete_rule_configuration( + &self, + request: Request, + ) -> Result, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + + let span = info_span!("delete.configuration.rule"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "delete"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "rule"); + span.set_attribute("otel.kind", "client"); + + let updated = sqlx::query_as!( + RuleRow, + r#" + delete from rule + where id = $1 and version = $2 + returning id, uuid, version, configuration as "configuration: sqlx::types::Json" + "#, + request.id, + request.version, + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache( + self, + CacheKey::Rule { + id: &request.id, + version: &request.version, + } + ), + publish_reload(self, conf, ReloadEvent::Rule) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } +} diff --git a/crates/configuration/src/state/rule/query_rule.rs b/crates/configuration/src/state/rule/query_rule.rs new file mode 100644 index 0000000..145de5c --- /dev/null +++ b/crates/configuration/src/state/rule/query_rule.rs @@ -0,0 +1,97 @@ +use opentelemetry_semantic_conventions::attribute; +use prost::Message; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{Instrument, debug, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::configuration::rule::{ + GetRuleConfigResponse, RuleConfiguration, RuleConfigurationRequest, + query_rule_configuration_server::QueryRuleConfiguration, +}; +use warden_stack::redis::AsyncCommands; + +use crate::state::{AppHandle, cache_key::CacheKey, rule::RuleRow}; + +#[async_trait] +impl QueryRuleConfiguration for AppHandle { + #[instrument(skip(self, request), Err(Debug))] + async fn get_rule_configuration( + &self, + request: Request, + ) -> Result, Status> { + let data = request.into_inner(); + let mut cache = self + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let key = CacheKey::from(&data); + + let span = info_span!("cache.get.rule"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "get"); + span.set_attribute("otel.kind", "client"); + + let configuration = cache + .get::<_, Vec>(&key) + .instrument(span) + .await + .map(|value| { + if !value.is_empty() { + RuleConfiguration::decode(value.as_ref()).ok() + } else { + None + } + }); + + if let Ok(Some(rule_config)) = configuration { + return Ok(tonic::Response::new(GetRuleConfigResponse { + configuration: Some(rule_config), + })); + } + + let span = info_span!("get.rule"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "select"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "rule"); + span.set_attribute("otel.kind", "client"); + + let config = sqlx::query_as!( + RuleRow, + r#"select uuid, version, id, configuration as "configuration: sqlx::types::Json" from rule where + id = $1 and version = $2"#, + data.id, + data.version, + ) + .fetch_optional(&self.services.postgres) + .instrument(span) + .await.map_err(|e| tonic::Status::internal(e.to_string()))?; + + let config = config.map(|transaction| { + debug!(id = ?transaction.id, "found config"); + transaction.configuration.0 + }); + + match config { + Some(config) => { + let bytes = config.encode_to_vec(); + let span = info_span!("cache.set"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "set"); + span.set_attribute("otel.kind", "client"); + + if let Err(e) = cache.set::<_, _, ()>(&key, bytes).instrument(span).await { + warn!("{e}"); + }; + + Ok(tonic::Response::new(GetRuleConfigResponse { + configuration: Some(config), + })) + } + None => Ok(tonic::Response::new(GetRuleConfigResponse { + configuration: None, + })), + } + } +} -- cgit v1.2.3