aboutsummaryrefslogtreecommitdiffstats
path: root/crates/configuration/src/state
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-14 18:05:07 +0200
committerrtkay123 <dev@kanjala.com>2025-08-14 18:05:07 +0200
commit1b8c6886f6d22f9c61e978b42d066dce91e334dc (patch)
tree468fae725d2464baac9b27d6a02f67f24e3eeb9f /crates/configuration/src/state
parent600c7a1942c06c0f7a0ae87448595057206bf324 (diff)
downloadwarden-1b8c6886f6d22f9c61e978b42d066dce91e334dc.tar.bz2
warden-1b8c6886f6d22f9c61e978b42d066dce91e334dc.zip
feat(config): rule grpc
Diffstat (limited to 'crates/configuration/src/state')
-rw-r--r--crates/configuration/src/state/cache_key.rs2
-rw-r--r--crates/configuration/src/state/routing/mutate_routing.rs23
-rw-r--r--crates/configuration/src/state/routing/query_routing.rs16
-rw-r--r--crates/configuration/src/state/rule.rs24
-rw-r--r--crates/configuration/src/state/rule/mutate_rule.rs154
-rw-r--r--crates/configuration/src/state/rule/query_rule.rs97
6 files changed, 304 insertions, 12 deletions
diff --git a/crates/configuration/src/state/cache_key.rs b/crates/configuration/src/state/cache_key.rs
index a63b15d..a99700e 100644
--- a/crates/configuration/src/state/cache_key.rs
+++ b/crates/configuration/src/state/cache_key.rs
@@ -4,6 +4,7 @@ use warden_stack::redis::ToRedisArgs;
pub enum CacheKey<'a> {
ActiveRouting,
Routing(&'a uuid::Uuid),
+ Rule { id: &'a str, version: &'a str },
}
impl ToRedisArgs for CacheKey<'_> {
@@ -14,6 +15,7 @@ impl ToRedisArgs for CacheKey<'_> {
let value = match self {
CacheKey::ActiveRouting => "routing.active".into(),
CacheKey::Routing(uuid) => format!("routing.{uuid}"),
+ CacheKey::Rule { id, version } => format!("rule.{id}.{version}"),
};
out.write_arg(value.as_bytes());
diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs
index 0c0637a..105cf18 100644
--- a/crates/configuration/src/state/routing/mutate_routing.rs
+++ b/crates/configuration/src/state/routing/mutate_routing.rs
@@ -1,6 +1,6 @@
use opentelemetry_semantic_conventions::attribute;
use tonic::{Request, Response, Status, async_trait};
-use tracing::{Instrument, error, info_span, instrument};
+use tracing::{Instrument, error, info_span, instrument, trace};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use uuid::Uuid;
use warden_core::configuration::{
@@ -26,11 +26,14 @@ impl MutateRouting for AppHandle {
&self,
request: Request<RoutingConfiguration>,
) -> Result<Response<RoutingConfiguration>, Status> {
+ trace!("creating routing configuration");
+
let request = request.into_inner();
let span = info_span!("create.configuration.routing");
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+ span.set_attribute("otel.kind", "client");
sqlx::query!(
"insert into routing (id, configuration) values ($1, $2)",
@@ -42,8 +45,9 @@ impl MutateRouting for AppHandle {
.await
.map_err(|e| {
error!("{e}");
- tonic::Status::internal(e.to_string())
+ tonic::Status::internal("database error")
})?;
+ trace!("configuration created");
Ok(tonic::Response::new(request))
}
@@ -58,7 +62,7 @@ impl MutateRouting for AppHandle {
.subject
.split(".")
.next()
- .expect("bad config");
+ .expect("checked on startup");
let request = request.into_inner();
let id = Uuid::parse_str(&request.id)
@@ -70,6 +74,9 @@ impl MutateRouting for AppHandle {
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "update");
span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+ span.set_attribute("otel.kind", "client");
+
+ trace!("updating configuration");
let updated = sqlx::query_as!(
RoutingRow,
@@ -87,8 +94,9 @@ impl MutateRouting for AppHandle {
.await
.map_err(|e| {
error!("{e}");
- tonic::Status::internal(e.to_string())
+ tonic::Status::internal("database is not ready")
})?;
+ trace!("configuration updated");
let (_del_result, _publish_result) = tokio::try_join!(
invalidate_cache(self, CacheKey::Routing(&id)),
@@ -110,7 +118,7 @@ impl MutateRouting for AppHandle {
.subject
.split(".")
.next()
- .expect("bad config");
+ .expect("checked on startup");
let request = request.into_inner();
let id = Uuid::parse_str(&request.id)
@@ -120,6 +128,8 @@ impl MutateRouting for AppHandle {
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "delete");
span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+ span.set_attribute("otel.kind", "client");
+ trace!("deleting configuration");
let updated = sqlx::query_as!(
RoutingRow,
@@ -135,8 +145,9 @@ impl MutateRouting for AppHandle {
.await
.map_err(|e| {
error!("{e}");
- tonic::Status::internal(e.to_string())
+ tonic::Status::internal("database is not ready")
})?;
+ trace!("configuration deleted");
let (_del_result, _publish_result) = tokio::try_join!(
invalidate_cache(self, CacheKey::Routing(&id)),
diff --git a/crates/configuration/src/state/routing/query_routing.rs b/crates/configuration/src/state/routing/query_routing.rs
index 3c6814d..82c8d1c 100644
--- a/crates/configuration/src/state/routing/query_routing.rs
+++ b/crates/configuration/src/state/routing/query_routing.rs
@@ -38,6 +38,8 @@ impl QueryRouting for AppHandle {
span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
span.set_attribute(attribute::DB_OPERATION_NAME, "get");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
+ span.set_attribute("otel.kind", "client");
+
let routing_config = cache
.get::<_, Vec<u8>>(CacheKey::ActiveRouting)
.instrument(span)
@@ -50,12 +52,12 @@ impl QueryRouting for AppHandle {
}
});
- if let Ok(Some(routing_config)) = routing_config {
- if routing_config.active {
- return Ok(tonic::Response::new(GetActiveRoutingResponse {
- configuration: Some(routing_config),
- }));
- }
+ if let Ok(Some(routing_config)) = routing_config
+ && routing_config.active
+ {
+ return Ok(tonic::Response::new(GetActiveRoutingResponse {
+ configuration: Some(routing_config),
+ }));
}
let span = info_span!("db.get.routing.active");
@@ -63,6 +65,7 @@ impl QueryRouting for AppHandle {
span.set_attribute(attribute::DB_OPERATION_NAME, "select");
span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
+ span.set_attribute("otel.kind", "client");
let config = sqlx::query_as!(
RoutingRow,
@@ -85,6 +88,7 @@ impl QueryRouting for AppHandle {
span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
span.set_attribute(attribute::DB_OPERATION_NAME, "set");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active");
+ span.set_attribute("otel.kind", "client");
if let Err(e) = cache
.set::<_, _, ()>(CacheKey::ActiveRouting, bytes)
diff --git a/crates/configuration/src/state/rule.rs b/crates/configuration/src/state/rule.rs
new file mode 100644
index 0000000..4038865
--- /dev/null
+++ b/crates/configuration/src/state/rule.rs
@@ -0,0 +1,24 @@
+use uuid::Uuid;
+use warden_core::configuration::rule::{RuleConfiguration, RuleConfigurationRequest};
+
+use crate::state::cache_key::CacheKey;
+
+mod mutate_rule;
+mod query_rule;
+
+#[allow(dead_code)]
+pub struct RuleRow {
+ pub uuid: Uuid,
+ pub id: Option<String>,
+ pub version: Option<String>,
+ pub configuration: sqlx::types::Json<RuleConfiguration>,
+}
+
+impl<'a> From<&'a RuleConfigurationRequest> for CacheKey<'a> {
+ fn from(value: &'a RuleConfigurationRequest) -> Self {
+ Self::Rule {
+ id: &value.id,
+ version: &value.version,
+ }
+ }
+}
diff --git a/crates/configuration/src/state/rule/mutate_rule.rs b/crates/configuration/src/state/rule/mutate_rule.rs
new file mode 100644
index 0000000..7b853aa
--- /dev/null
+++ b/crates/configuration/src/state/rule/mutate_rule.rs
@@ -0,0 +1,154 @@
+use opentelemetry_semantic_conventions::attribute;
+use tonic::{Request, Response, Status, async_trait};
+use tracing::{Instrument, error, info_span};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use uuid::Uuid;
+use warden_core::configuration::{
+ ReloadEvent,
+ rule::{
+ DeleteRuleConfigurationRequest, RuleConfiguration, UpdateRuleRequest,
+ mutate_rule_configuration_server::MutateRuleConfiguration,
+ },
+};
+
+use crate::state::{
+ AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload, rule::RuleRow,
+};
+
+#[async_trait]
+impl MutateRuleConfiguration for AppHandle {
+ async fn create_rule_configuration(
+ &self,
+ request: Request<RuleConfiguration>,
+ ) -> Result<Response<RuleConfiguration>, Status> {
+ let request = request.into_inner();
+ let span = info_span!("create.configuration.rule");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "rule");
+ span.set_attribute("otel.kind", "client");
+
+ sqlx::query!(
+ "insert into rule (uuid, configuration) values ($1, $2)",
+ Uuid::now_v7(),
+ sqlx::types::Json(&request) as _,
+ )
+ .execute(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ Ok(tonic::Response::new(request))
+ }
+
+ async fn update_rule_configuration(
+ &self,
+ request: Request<UpdateRuleRequest>,
+ ) -> Result<Response<RuleConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("checked on startup");
+
+ let request = request.into_inner();
+
+ let config = request.configuration.expect("configuration to be provided");
+
+ let span = info_span!("update.configuration.routing");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "update");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "rule");
+ span.set_attribute("otel.kind", "client");
+
+ sqlx::query!(
+ r#"
+ update rule
+ set configuration = $1
+ where id = $2 and version = $3
+ "#,
+ sqlx::types::Json(&config) as _,
+ config.id,
+ config.id,
+ )
+ .execute(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(
+ self,
+ CacheKey::Rule {
+ id: &config.id,
+ version: &config.version,
+ }
+ ),
+ publish_reload(self, conf, ReloadEvent::Rule)
+ )?;
+
+ Ok(Response::new(config))
+ }
+
+ async fn delete_rule_configuration(
+ &self,
+ request: Request<DeleteRuleConfigurationRequest>,
+ ) -> Result<Response<RuleConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("bad config");
+
+ let request = request.into_inner();
+
+ let span = info_span!("delete.configuration.rule");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "delete");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "rule");
+ span.set_attribute("otel.kind", "client");
+
+ let updated = sqlx::query_as!(
+ RuleRow,
+ r#"
+ delete from rule
+ where id = $1 and version = $2
+ returning id, uuid, version, configuration as "configuration: sqlx::types::Json<RuleConfiguration>"
+ "#,
+ request.id,
+ request.version,
+ )
+ .fetch_one(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(
+ self,
+ CacheKey::Rule {
+ id: &request.id,
+ version: &request.version,
+ }
+ ),
+ publish_reload(self, conf, ReloadEvent::Rule)
+ )?;
+
+ let res = updated.configuration.0;
+
+ Ok(Response::new(res))
+ }
+}
diff --git a/crates/configuration/src/state/rule/query_rule.rs b/crates/configuration/src/state/rule/query_rule.rs
new file mode 100644
index 0000000..145de5c
--- /dev/null
+++ b/crates/configuration/src/state/rule/query_rule.rs
@@ -0,0 +1,97 @@
+use opentelemetry_semantic_conventions::attribute;
+use prost::Message;
+use tonic::{Request, Response, Status, async_trait};
+use tracing::{Instrument, debug, info_span, instrument, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::configuration::rule::{
+ GetRuleConfigResponse, RuleConfiguration, RuleConfigurationRequest,
+ query_rule_configuration_server::QueryRuleConfiguration,
+};
+use warden_stack::redis::AsyncCommands;
+
+use crate::state::{AppHandle, cache_key::CacheKey, rule::RuleRow};
+
+#[async_trait]
+impl QueryRuleConfiguration for AppHandle {
+ #[instrument(skip(self, request), Err(Debug))]
+ async fn get_rule_configuration(
+ &self,
+ request: Request<RuleConfigurationRequest>,
+ ) -> Result<Response<GetRuleConfigResponse>, Status> {
+ let data = request.into_inner();
+ let mut cache = self
+ .services
+ .cache
+ .get()
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let key = CacheKey::from(&data);
+
+ let span = info_span!("cache.get.rule");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "get");
+ span.set_attribute("otel.kind", "client");
+
+ let configuration = cache
+ .get::<_, Vec<u8>>(&key)
+ .instrument(span)
+ .await
+ .map(|value| {
+ if !value.is_empty() {
+ RuleConfiguration::decode(value.as_ref()).ok()
+ } else {
+ None
+ }
+ });
+
+ if let Ok(Some(rule_config)) = configuration {
+ return Ok(tonic::Response::new(GetRuleConfigResponse {
+ configuration: Some(rule_config),
+ }));
+ }
+
+ let span = info_span!("get.rule");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "select");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "rule");
+ span.set_attribute("otel.kind", "client");
+
+ let config = sqlx::query_as!(
+ RuleRow,
+ r#"select uuid, version, id, configuration as "configuration: sqlx::types::Json<RuleConfiguration>" from rule where
+ id = $1 and version = $2"#,
+ data.id,
+ data.version,
+ )
+ .fetch_optional(&self.services.postgres)
+ .instrument(span)
+ .await.map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let config = config.map(|transaction| {
+ debug!(id = ?transaction.id, "found config");
+ transaction.configuration.0
+ });
+
+ match config {
+ Some(config) => {
+ let bytes = config.encode_to_vec();
+ let span = info_span!("cache.set");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "set");
+ span.set_attribute("otel.kind", "client");
+
+ if let Err(e) = cache.set::<_, _, ()>(&key, bytes).instrument(span).await {
+ warn!("{e}");
+ };
+
+ Ok(tonic::Response::new(GetRuleConfigResponse {
+ configuration: Some(config),
+ }))
+ }
+ None => Ok(tonic::Response::new(GetRuleConfigResponse {
+ configuration: None,
+ })),
+ }
+ }
+}