aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-16 00:47:43 +0200
committerrtkay123 <dev@kanjala.com>2025-08-16 00:47:43 +0200
commit000885c1d5a23eb353c3f490e32363010ca804d3 (patch)
tree74f320a969b45f765a4826f31ee88064822cdccd
parent4a82b6db8a1278588b97b874bad468ec6f7cda6c (diff)
downloadwarden-000885c1d5a23eb353c3f490e32363010ca804d3.tar.bz2
warden-000885c1d5a23eb353c3f490e32363010ca804d3.zip
feat(config): identify resource to reload
-rw-r--r--crates/configuration/src/state.rs4
-rw-r--r--crates/configuration/src/state/routing/mutate_routing.rs20
-rw-r--r--crates/configuration/src/state/rule/mutate_rule.rs22
-rw-r--r--crates/router/src/processor/reload.rs20
-rw-r--r--crates/rule-executor/src/processor/reload.rs38
-rw-r--r--proto/configuration/reload_event.proto8
6 files changed, 82 insertions, 30 deletions
diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs
index d8f22d5..f337a54 100644
--- a/crates/configuration/src/state.rs
+++ b/crates/configuration/src/state.rs
@@ -4,6 +4,7 @@ mod rule;
use async_nats::jetstream::Context;
use opentelemetry_semantic_conventions::attribute;
+use prost::Message;
use sqlx::PgPool;
use std::{ops::Deref, sync::Arc};
use tracing::{Instrument, info_span, instrument, trace};
@@ -91,10 +92,11 @@ pub async fn publish_reload(
span.set_attribute(attribute::MESSAGING_SYSTEM, "nats");
span.set_attribute("otel.kind", "producer");
+ let bytes = event.encode_to_vec();
state
.services
.jetstream
- .publish(format!("{prefix}.reload"), event.as_str_name().into())
+ .publish(format!("{prefix}.reload"), bytes.into())
.instrument(span)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs
index 105cf18..9542ba7 100644
--- a/crates/configuration/src/state/routing/mutate_routing.rs
+++ b/crates/configuration/src/state/routing/mutate_routing.rs
@@ -4,7 +4,7 @@ use tracing::{Instrument, error, info_span, instrument, trace};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use uuid::Uuid;
use warden_core::configuration::{
- ReloadEvent,
+ ConfigKind, ReloadEvent,
routing::{
DeleteConfigurationRequest, RoutingConfiguration, UpdateRoutingRequest,
mutate_routing_server::MutateRouting,
@@ -100,7 +100,14 @@ impl MutateRouting for AppHandle {
let (_del_result, _publish_result) = tokio::try_join!(
invalidate_cache(self, CacheKey::Routing(&id)),
- publish_reload(self, conf, ReloadEvent::Routing)
+ publish_reload(
+ self,
+ conf,
+ ReloadEvent {
+ kind: ConfigKind::Routing.into(),
+ ..Default::default()
+ }
+ )
)?;
let res = updated.configuration.0;
@@ -151,7 +158,14 @@ impl MutateRouting for AppHandle {
let (_del_result, _publish_result) = tokio::try_join!(
invalidate_cache(self, CacheKey::Routing(&id)),
- publish_reload(self, conf, ReloadEvent::Routing)
+ publish_reload(
+ self,
+ conf,
+ ReloadEvent {
+ kind: ConfigKind::Routing.into(),
+ ..Default::default()
+ }
+ )
)?;
let res = updated.configuration.0;
diff --git a/crates/configuration/src/state/rule/mutate_rule.rs b/crates/configuration/src/state/rule/mutate_rule.rs
index 7b853aa..9c4f393 100644
--- a/crates/configuration/src/state/rule/mutate_rule.rs
+++ b/crates/configuration/src/state/rule/mutate_rule.rs
@@ -4,7 +4,7 @@ use tracing::{Instrument, error, info_span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use uuid::Uuid;
use warden_core::configuration::{
- ReloadEvent,
+ ConfigKind, ReloadEvent,
rule::{
DeleteRuleConfigurationRequest, RuleConfiguration, UpdateRuleRequest,
mutate_rule_configuration_server::MutateRuleConfiguration,
@@ -92,7 +92,15 @@ impl MutateRuleConfiguration for AppHandle {
version: &config.version,
}
),
- publish_reload(self, conf, ReloadEvent::Rule)
+ publish_reload(
+ self,
+ conf,
+ ReloadEvent {
+ kind: ConfigKind::Rule.into(),
+ id: Some(config.id.to_owned()),
+ version: Some(config.version.to_owned()),
+ }
+ )
)?;
Ok(Response::new(config))
@@ -144,7 +152,15 @@ impl MutateRuleConfiguration for AppHandle {
version: &request.version,
}
),
- publish_reload(self, conf, ReloadEvent::Rule)
+ publish_reload(
+ self,
+ conf,
+ ReloadEvent {
+ kind: ConfigKind::Rule.into(),
+ id: Some(request.id.to_owned()),
+ version: Some(request.version.to_owned()),
+ }
+ )
)?;
let res = updated.configuration.0;
diff --git a/crates/router/src/processor/reload.rs b/crates/router/src/processor/reload.rs
index 900b7ce..e15ca07 100644
--- a/crates/router/src/processor/reload.rs
+++ b/crates/router/src/processor/reload.rs
@@ -1,8 +1,9 @@
use async_nats::jetstream::consumer;
use futures_util::StreamExt;
-use tracing::{debug, error, info, trace};
+use prost::Message as _;
+use tracing::{error, info, trace};
use uuid::Uuid;
-use warden_core::configuration::ReloadEvent;
+use warden_core::configuration::{ConfigKind, ReloadEvent};
use crate::state::AppHandle;
@@ -31,18 +32,19 @@ pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
while let Some(value) = messages.next().await {
match value {
Ok(message) => {
- trace!("got reload cache event",);
- if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec())
- .map(|value| ReloadEvent::from_str_name(&value))
+ trace!("got reload cache event");
+ if let Ok(res) = ReloadEvent::decode(message.payload.as_ref())
+ && let Ok(kind) = ConfigKind::try_from(res.kind)
{
- match event {
- ReloadEvent::Routing => {
+ match kind {
+ ConfigKind::Routing => {
+ trace!("update triggered, invalidating active routing config");
let local_cache = state.local_cache.write().await;
local_cache.invalidate_all();
let _ = message.ack().await.inspect_err(|e| error!("{e}"));
}
- _ => {
- debug!(event = ?event, "detected reload event, acknowledging");
+ ConfigKind::Rule => {
+ trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging");
let _ = message.ack().await.inspect_err(|e| error!("{e}"));
}
}
diff --git a/crates/rule-executor/src/processor/reload.rs b/crates/rule-executor/src/processor/reload.rs
index a111948..385c7ab 100644
--- a/crates/rule-executor/src/processor/reload.rs
+++ b/crates/rule-executor/src/processor/reload.rs
@@ -1,8 +1,9 @@
use async_nats::jetstream::consumer;
use futures_util::StreamExt;
-use tracing::{debug, error, info};
+use prost::Message as _;
+use tracing::{error, info, trace};
use uuid::Uuid;
-use warden_core::configuration::ReloadEvent;
+use warden_core::configuration::{ConfigKind, ReloadEvent, rule::RuleConfigurationRequest};
use crate::state::AppHandle;
@@ -31,26 +32,37 @@ pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
while let Some(value) = messages.next().await {
match value {
Ok(message) => {
- debug!("got reload cache event",);
- if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec())
- .map(|value| ReloadEvent::from_str_name(&value))
+ trace!("got reload cache event");
+ if let Ok(res) = ReloadEvent::decode(message.payload.as_ref())
+ && let Ok(kind) = ConfigKind::try_from(res.kind)
{
- match event {
- // TODO: find exact rule
- ReloadEvent::Rule => {
- let local_cache = state.local_cache.write().await;
- local_cache.invalidate_all();
+ match kind {
+ ConfigKind::Routing => {
+ trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging");
let _ = message.ack().await.inspect_err(|e| error!("{e}"));
}
- _ => {
- debug!(event = ?event, "detected reload event, acknowledging");
+ ConfigKind::Rule => {
+ let local_cache = state.local_cache.write().await;
+ let id = res.id();
+ let version = res.version();
+ trace!(
+ id = id,
+ ver = version,
+ "update triggered, invalidating rule config"
+ );
+ let key = RuleConfigurationRequest {
+ id: id.to_string(),
+ version: version.to_string(),
+ };
+
+ local_cache.invalidate(&key).await;
let _ = message.ack().await.inspect_err(|e| error!("{e}"));
}
}
}
}
Err(e) => {
- error!("{e}")
+ error!("{e:?}")
}
}
}
diff --git a/proto/configuration/reload_event.proto b/proto/configuration/reload_event.proto
index 569de80..b43b44b 100644
--- a/proto/configuration/reload_event.proto
+++ b/proto/configuration/reload_event.proto
@@ -2,7 +2,13 @@ syntax = "proto3";
package configuration;
-enum ReloadEvent {
+enum ConfigKind {
ROUTING = 0;
RULE = 1;
}
+
+message ReloadEvent {
+ ConfigKind kind = 1;
+ optional string id = 2;
+ optional string version = 3;
+}