aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-14 18:05:07 +0200
committerrtkay123 <dev@kanjala.com>2025-08-14 18:05:07 +0200
commit1b8c6886f6d22f9c61e978b42d066dce91e334dc (patch)
tree468fae725d2464baac9b27d6a02f67f24e3eeb9f
parent600c7a1942c06c0f7a0ae87448595057206bf324 (diff)
downloadwarden-1b8c6886f6d22f9c61e978b42d066dce91e334dc.tar.bz2
warden-1b8c6886f6d22f9c61e978b42d066dce91e334dc.zip
feat(config): rule grpc
-rw-r--r--crates/configuration/migrations/20250814155222_rule.sql11
-rw-r--r--crates/configuration/src/server.rs18
-rw-r--r--crates/configuration/src/server/http_svc/routes.rs5
-rw-r--r--crates/configuration/src/server/http_svc/routes/routing.rs2
-rw-r--r--crates/configuration/src/server/http_svc/routes/routing/get_active.rs1
-rw-r--r--crates/configuration/src/server/http_svc/routes/routing/post_routing.rs39
-rw-r--r--crates/configuration/src/state.rs8
-rw-r--r--crates/configuration/src/state/cache_key.rs2
-rw-r--r--crates/configuration/src/state/routing/mutate_routing.rs23
-rw-r--r--crates/configuration/src/state/routing/query_routing.rs16
-rw-r--r--crates/configuration/src/state/rule.rs24
-rw-r--r--crates/configuration/src/state/rule/mutate_rule.rs154
-rw-r--r--crates/configuration/src/state/rule/query_rule.rs97
13 files changed, 384 insertions, 16 deletions
diff --git a/crates/configuration/migrations/20250814155222_rule.sql b/crates/configuration/migrations/20250814155222_rule.sql
new file mode 100644
index 0000000..09f216a
--- /dev/null
+++ b/crates/configuration/migrations/20250814155222_rule.sql
@@ -0,0 +1,11 @@
+create table rule (
+ uuid uuid primary key,
+ configuration jsonb not null,
+ id text generated always as (
+ configuration->>'id'
+ ) stored,
+ version text generated always as (
+ configuration->>'version'
+ ) stored,
+ unique (id, version)
+);
diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs
index dec0813..28d6dd8 100644
--- a/crates/configuration/src/server.rs
+++ b/crates/configuration/src/server.rs
@@ -10,8 +10,14 @@ use tonic::service::Routes;
use tower_http::trace::TraceLayer;
use warden_core::{
FILE_DESCRIPTOR_SET,
- configuration::routing::{
- mutate_routing_server::MutateRoutingServer, query_routing_server::QueryRoutingServer,
+ configuration::{
+ routing::{
+ mutate_routing_server::MutateRoutingServer, query_routing_server::QueryRoutingServer,
+ },
+ rule::{
+ mutate_rule_configuration_server::MutateRuleConfigurationServer,
+ query_rule_configuration_server::QueryRuleConfigurationServer,
+ },
},
};
@@ -31,6 +37,14 @@ pub fn serve(state: AppHandle) -> Result<(axum::Router, axum::Router), AppError>
state.clone(),
MyInterceptor,
))
+ .add_service(MutateRuleConfigurationServer::with_interceptor(
+ state.clone(),
+ MyInterceptor,
+ ))
+ .add_service(QueryRuleConfigurationServer::with_interceptor(
+ state.clone(),
+ MyInterceptor,
+ ))
.add_service(routing_reflector)
.into_axum_router()
.layer(
diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs
index e70dccd..bec7c77 100644
--- a/crates/configuration/src/server/http_svc/routes.rs
+++ b/crates/configuration/src/server/http_svc/routes.rs
@@ -6,6 +6,9 @@ use crate::state::AppHandle;
pub fn router(store: AppHandle) -> OpenApiRouter {
OpenApiRouter::new()
- .routes(routes!(routing::get_active::active_routing))
+ .routes(routes!(
+ routing::get_active::active_routing,
+ routing::post_routing::post_routing
+ ))
.with_state(store)
}
diff --git a/crates/configuration/src/server/http_svc/routes/routing.rs b/crates/configuration/src/server/http_svc/routes/routing.rs
index 7c8affe..9013494 100644
--- a/crates/configuration/src/server/http_svc/routes/routing.rs
+++ b/crates/configuration/src/server/http_svc/routes/routing.rs
@@ -1,4 +1,4 @@
//pub mod delete_routing;
pub mod get_active;
-//pub mod post_routing;
+pub mod post_routing;
//pub mod replace_routing;
diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
index 9dc22e3..4875a80 100644
--- a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
+++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
@@ -10,6 +10,7 @@ use crate::{
state::AppHandle,
};
+/// Get active routing configuration
#[utoipa::path(
get,
responses((
diff --git a/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs b/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs
new file mode 100644
index 0000000..3578b65
--- /dev/null
+++ b/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs
@@ -0,0 +1,39 @@
+use axum::{extract::State, response::IntoResponse};
+use warden_core::configuration::routing::{
+ RoutingConfiguration, mutate_routing_server::MutateRouting,
+};
+
+use crate::{
+ server::{error::AppError, http_svc::TAG_ROUTING, version::Version},
+ state::AppHandle,
+};
+
+/// Create routing configuration
+#[utoipa::path(
+ post,
+ responses((
+ status = CREATED,
+ body = RoutingConfiguration
+ )),
+ operation_id = "post_routing_configuration", // https://github.com/juhaku/utoipa/issues/1170
+ path = "/{version}/routing",
+ params(
+ ("version" = Version, Path, description = "API version, e.g., v1, v2, v3")
+ ),
+ tag = TAG_ROUTING,
+)
+]
+#[axum::debug_handler]
+#[tracing::instrument(skip(state), err(Debug), fields(method = "POST"))]
+pub async fn post_routing(
+ version: Version,
+ State(state): State<AppHandle>,
+ axum::Json(body): axum::Json<RoutingConfiguration>,
+) -> Result<impl IntoResponse, AppError> {
+ let response = state
+ .create_routing_configuration(tonic::Request::new(body))
+ .await?
+ .into_inner();
+
+ Ok((axum::http::StatusCode::CREATED, axum::Json(response)))
+}
diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs
index 5a51d5b..de58d4b 100644
--- a/crates/configuration/src/state.rs
+++ b/crates/configuration/src/state.rs
@@ -1,5 +1,6 @@
mod cache_key;
mod routing;
+mod rule;
use async_nats::jetstream::Context;
use sqlx::PgPool;
@@ -44,6 +45,13 @@ impl AppState {
) -> Result<AppHandle, AppError> {
let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?;
+ local_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .ok_or_else(|| anyhow::anyhow!("expected a dot separated config for nats subjects"))?;
+
create_stream(&services.jetstream, &local_config.nats).await?;
Ok(AppHandle(Arc::new(Self {
diff --git a/crates/configuration/src/state/cache_key.rs b/crates/configuration/src/state/cache_key.rs
index a63b15d..a99700e 100644
--- a/crates/configuration/src/state/cache_key.rs
+++ b/crates/configuration/src/state/cache_key.rs
@@ -4,6 +4,7 @@ use warden_stack::redis::ToRedisArgs;
pub enum CacheKey<'a> {
ActiveRouting,
Routing(&'a uuid::Uuid),
+ Rule { id: &'a str, version: &'a str },
}
impl ToRedisArgs for CacheKey<'_> {
@@ -14,6 +15,7 @@ impl ToRedisArgs for CacheKey<'_> {
let value = match self {
CacheKey::ActiveRouting => "routing.active".into(),
CacheKey::Routing(uuid) => format!("routing.{uuid}"),
+ CacheKey::Rule { id, version } => format!("rule.{id}.{version}"),
};
out.write_arg(value.as_bytes());
diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs
index 0c0637a..105cf18 100644
--- a/crates/configuration/src/state/routing/mutate_routing.rs
+++ b/crates/configuration/src/state/routing/mutate_routing.rs
@@ -1,6 +1,6 @@
use opentelemetry_semantic_conventions::attribute;
use tonic::{Request, Response, Status, async_trait};
-use tracing::{Instrument, error, info_span, instrument};
+use tracing::{Instrument, error, info_span, instrument, trace};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use uuid::Uuid;
use warden_core::configuration::{
@@ -26,11 +26,14 @@ impl MutateRouting for AppHandle {
&self,
request: Request<RoutingConfiguration>,
) -> Result<Response<RoutingConfiguration>, Status> {
+ trace!("creating routing configuration");
+
let request = request.into_inner();
let span = info_span!("create.configuration.routing");
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+ span.set_attribute("otel.kind", "client");
sqlx::query!(
"insert into routing (id, configuration) values ($1, $2)",
@@ -42,8 +45,9 @@ impl MutateRouting for AppHandle {
.await
.map_err(|e| {
error!("{e}");
- tonic::Status::internal(e.to_string())
+ tonic::Status::internal("database error")
})?;
+ trace!("configuration created");
Ok(tonic::Response::new(request))
}
@@ -58,7 +62,7 @@ impl MutateRouting for AppHandle {
.subject
.split(".")
.next()
- .expect("bad config");
+ .expect("checked on startup");
let request = request.into_inner();
let id = Uuid::parse_str(&request.id)
@@ -70,6 +74,9 @@ impl MutateRouting for AppHandle {
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "update");
span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+ span.set_attribute("otel.kind", "client");
+
+ trace!("updating configuration");
let updated = sqlx::query_as!(
RoutingRow,
@@ -87,8 +94,9 @@ impl MutateRouting for AppHandle {
.await
.map_err(|e| {
error!("{e}");
- tonic::Status::internal(e.to_string())
+ tonic::Status::internal("database is not ready")
})?;
+ trace!("configuration updated");
let (_del_result, _publish_result) = tokio::try_join!(
invalidate_cache(self, CacheKey::Routing(&id)),
@@ -110,7 +118,7 @@ impl MutateRouting for AppHandle {
.subject
.split(".")
.next()
- .expect("bad config");
+ .expect("checked on startup");
let request = request.into_inner();
let id = Uuid::parse_str(&request.id)
@@ -120,6 +128,8 @@ impl MutateRouting for AppHandle {
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "delete");
span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+ span.set_attribute("otel.kind", "client");
+ trace!("deleting configuration");
let updated = sqlx::query_as!(
RoutingRow,
@@ -135,8 +145,9 @@ impl MutateRouting for AppHandle {
.await
.map_err(|e| {
error!("{e}");
- tonic::Status::internal(e.to_string())
+ tonic::Status::internal("database is not ready")
})?;
+ trace!("configuration deleted");
let (_del_result, _publish_result) = tokio::try_join!(
invalidate_cache(self, CacheKey::Routing(&id)),
diff --git a/crates/configuration/src/state/routing/query_routing.rs b/crates/configuration/src/state/routing/query_routing.rs
index 3c6814d..82c8d1c 100644
--- a/crates/configuration/src/state/routing/query_routing.rs
+++ b/crates/configuration/src/state/routing/query_routing.rs
@@ -38,6 +38,8 @@ impl QueryRouting for AppHandle {
span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
span.set_attribute(attribute::DB_OPERATION_NAME, "get");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
+ span.set_attribute("otel.kind", "client");
+
let routing_config = cache
.get::<_, Vec<u8>>(CacheKey::ActiveRouting)
.instrument(span)
@@ -50,12 +52,12 @@ impl QueryRouting for AppHandle {
}
});
- if let Ok(Some(routing_config)) = routing_config {
- if routing_config.active {
- return Ok(tonic::Response::new(GetActiveRoutingResponse {
- configuration: Some(routing_config),
- }));
- }
+ if let Ok(Some(routing_config)) = routing_config
+ && routing_config.active
+ {
+ return Ok(tonic::Response::new(GetActiveRoutingResponse {
+ configuration: Some(routing_config),
+ }));
}
let span = info_span!("db.get.routing.active");
@@ -63,6 +65,7 @@ impl QueryRouting for AppHandle {
span.set_attribute(attribute::DB_OPERATION_NAME, "select");
span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
+ span.set_attribute("otel.kind", "client");
let config = sqlx::query_as!(
RoutingRow,
@@ -85,6 +88,7 @@ impl QueryRouting for AppHandle {
span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
span.set_attribute(attribute::DB_OPERATION_NAME, "set");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active");
+ span.set_attribute("otel.kind", "client");
if let Err(e) = cache
.set::<_, _, ()>(CacheKey::ActiveRouting, bytes)
diff --git a/crates/configuration/src/state/rule.rs b/crates/configuration/src/state/rule.rs
new file mode 100644
index 0000000..4038865
--- /dev/null
+++ b/crates/configuration/src/state/rule.rs
@@ -0,0 +1,24 @@
+use uuid::Uuid;
+use warden_core::configuration::rule::{RuleConfiguration, RuleConfigurationRequest};
+
+use crate::state::cache_key::CacheKey;
+
+mod mutate_rule;
+mod query_rule;
+
+#[allow(dead_code)]
+pub struct RuleRow {
+ pub uuid: Uuid,
+ pub id: Option<String>,
+ pub version: Option<String>,
+ pub configuration: sqlx::types::Json<RuleConfiguration>,
+}
+
+impl<'a> From<&'a RuleConfigurationRequest> for CacheKey<'a> {
+ fn from(value: &'a RuleConfigurationRequest) -> Self {
+ Self::Rule {
+ id: &value.id,
+ version: &value.version,
+ }
+ }
+}
diff --git a/crates/configuration/src/state/rule/mutate_rule.rs b/crates/configuration/src/state/rule/mutate_rule.rs
new file mode 100644
index 0000000..7b853aa
--- /dev/null
+++ b/crates/configuration/src/state/rule/mutate_rule.rs
@@ -0,0 +1,154 @@
+use opentelemetry_semantic_conventions::attribute;
+use tonic::{Request, Response, Status, async_trait};
+use tracing::{Instrument, error, info_span};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use uuid::Uuid;
+use warden_core::configuration::{
+ ReloadEvent,
+ rule::{
+ DeleteRuleConfigurationRequest, RuleConfiguration, UpdateRuleRequest,
+ mutate_rule_configuration_server::MutateRuleConfiguration,
+ },
+};
+
+use crate::state::{
+ AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload, rule::RuleRow,
+};
+
+#[async_trait]
+impl MutateRuleConfiguration for AppHandle {
+ async fn create_rule_configuration(
+ &self,
+ request: Request<RuleConfiguration>,
+ ) -> Result<Response<RuleConfiguration>, Status> {
+ let request = request.into_inner();
+ let span = info_span!("create.configuration.rule");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "rule");
+ span.set_attribute("otel.kind", "client");
+
+ sqlx::query!(
+ "insert into rule (uuid, configuration) values ($1, $2)",
+ Uuid::now_v7(),
+ sqlx::types::Json(&request) as _,
+ )
+ .execute(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ Ok(tonic::Response::new(request))
+ }
+
+ async fn update_rule_configuration(
+ &self,
+ request: Request<UpdateRuleRequest>,
+ ) -> Result<Response<RuleConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("checked on startup");
+
+ let request = request.into_inner();
+
+ let config = request.configuration.expect("configuration to be provided");
+
+ let span = info_span!("update.configuration.routing");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "update");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "rule");
+ span.set_attribute("otel.kind", "client");
+
+ sqlx::query!(
+ r#"
+ update rule
+ set configuration = $1
+ where id = $2 and version = $3
+ "#,
+ sqlx::types::Json(&config) as _,
+ config.id,
+ config.id,
+ )
+ .execute(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(
+ self,
+ CacheKey::Rule {
+ id: &config.id,
+ version: &config.version,
+ }
+ ),
+ publish_reload(self, conf, ReloadEvent::Rule)
+ )?;
+
+ Ok(Response::new(config))
+ }
+
+ async fn delete_rule_configuration(
+ &self,
+ request: Request<DeleteRuleConfigurationRequest>,
+ ) -> Result<Response<RuleConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("bad config");
+
+ let request = request.into_inner();
+
+ let span = info_span!("delete.configuration.rule");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "delete");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "rule");
+ span.set_attribute("otel.kind", "client");
+
+ let updated = sqlx::query_as!(
+ RuleRow,
+ r#"
+ delete from rule
+ where id = $1 and version = $2
+ returning id, uuid, version, configuration as "configuration: sqlx::types::Json<RuleConfiguration>"
+ "#,
+ request.id,
+ request.version,
+ )
+ .fetch_one(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(
+ self,
+ CacheKey::Rule {
+ id: &request.id,
+ version: &request.version,
+ }
+ ),
+ publish_reload(self, conf, ReloadEvent::Rule)
+ )?;
+
+ let res = updated.configuration.0;
+
+ Ok(Response::new(res))
+ }
+}
diff --git a/crates/configuration/src/state/rule/query_rule.rs b/crates/configuration/src/state/rule/query_rule.rs
new file mode 100644
index 0000000..145de5c
--- /dev/null
+++ b/crates/configuration/src/state/rule/query_rule.rs
@@ -0,0 +1,97 @@
+use opentelemetry_semantic_conventions::attribute;
+use prost::Message;
+use tonic::{Request, Response, Status, async_trait};
+use tracing::{Instrument, debug, info_span, instrument, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::configuration::rule::{
+ GetRuleConfigResponse, RuleConfiguration, RuleConfigurationRequest,
+ query_rule_configuration_server::QueryRuleConfiguration,
+};
+use warden_stack::redis::AsyncCommands;
+
+use crate::state::{AppHandle, cache_key::CacheKey, rule::RuleRow};
+
+#[async_trait]
+impl QueryRuleConfiguration for AppHandle {
+ #[instrument(skip(self, request), Err(Debug))]
+ async fn get_rule_configuration(
+ &self,
+ request: Request<RuleConfigurationRequest>,
+ ) -> Result<Response<GetRuleConfigResponse>, Status> {
+ let data = request.into_inner();
+ let mut cache = self
+ .services
+ .cache
+ .get()
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let key = CacheKey::from(&data);
+
+ let span = info_span!("cache.get.rule");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "get");
+ span.set_attribute("otel.kind", "client");
+
+ let configuration = cache
+ .get::<_, Vec<u8>>(&key)
+ .instrument(span)
+ .await
+ .map(|value| {
+ if !value.is_empty() {
+ RuleConfiguration::decode(value.as_ref()).ok()
+ } else {
+ None
+ }
+ });
+
+ if let Ok(Some(rule_config)) = configuration {
+ return Ok(tonic::Response::new(GetRuleConfigResponse {
+ configuration: Some(rule_config),
+ }));
+ }
+
+ let span = info_span!("get.rule");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "select");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "rule");
+ span.set_attribute("otel.kind", "client");
+
+ let config = sqlx::query_as!(
+ RuleRow,
+ r#"select uuid, version, id, configuration as "configuration: sqlx::types::Json<RuleConfiguration>" from rule where
+ id = $1 and version = $2"#,
+ data.id,
+ data.version,
+ )
+ .fetch_optional(&self.services.postgres)
+ .instrument(span)
+ .await.map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let config = config.map(|transaction| {
+ debug!(id = ?transaction.id, "found config");
+ transaction.configuration.0
+ });
+
+ match config {
+ Some(config) => {
+ let bytes = config.encode_to_vec();
+ let span = info_span!("cache.set");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "set");
+ span.set_attribute("otel.kind", "client");
+
+ if let Err(e) = cache.set::<_, _, ()>(&key, bytes).instrument(span).await {
+ warn!("{e}");
+ };
+
+ Ok(tonic::Response::new(GetRuleConfigResponse {
+ configuration: Some(config),
+ }))
+ }
+ None => Ok(tonic::Response::new(GetRuleConfigResponse {
+ configuration: None,
+ })),
+ }
+ }
+}