aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-12 14:00:28 +0200
committerrtkay123 <dev@kanjala.com>2025-08-12 14:00:28 +0200
commit1d347dd2142a266552812ac2f8844acf52d2dc1c (patch)
tree70beaeb0bf572b9ef7323cc98b7b52084011d2d8
parentc5ea875f544824b0c042bf7c0a58b3134f9c0373 (diff)
downloadwarden-1d347dd2142a266552812ac2f8844acf52d2dc1c.tar.bz2
warden-1d347dd2142a266552812ac2f8844acf52d2dc1c.zip
feat(config): reload config
-rw-r--r--.sqlx/query-1f6114577d8a5d358ad7a36b8e326e5ecf5fab54b0b145b31b5a644ecacb2b2e.json15
-rw-r--r--.sqlx/query-2a92509e9bdb7c3a7d16cd87adee04048afed140e124c021b1578b3f5969c3f7.json26
-rw-r--r--.sqlx/query-4fd63e6ec0ff3d61d790de01066341b25223bd39f1a51cc1abb60149e4a8092b.json28
-rw-r--r--.sqlx/query-5f721c625db72ec257080ff3f35fcd1bffbafaa3085f56d3ec1dc39704ac3935.json29
-rw-r--r--Cargo.lock2
-rw-r--r--crates/configuration/Cargo.toml9
-rw-r--r--crates/configuration/src/main.rs54
-rw-r--r--crates/configuration/src/server.rs46
-rw-r--r--crates/configuration/src/server/grpc_svc.rs24
-rw-r--r--crates/configuration/src/server/http_svc.rs2
-rw-r--r--crates/configuration/src/server/http_svc/routes.rs2
-rw-r--r--crates/configuration/src/server/http_svc/routes/routing/get_active.rs14
-rw-r--r--crates/configuration/src/server/interceptor.rs23
-rw-r--r--crates/configuration/src/server/reload_stream.rs21
-rw-r--r--crates/configuration/src/state.rs44
-rw-r--r--crates/configuration/src/state/routing.rs3
-rw-r--r--crates/configuration/src/state/routing/mutate_routing.rs150
-rw-r--r--crates/configuration/src/state/routing/query_routing.rs (renamed from crates/configuration/src/state/routing/query.rs)0
-rw-r--r--lib/warden-core/build.rs5
-rw-r--r--lib/warden-core/src/configuration.rs2
-rw-r--r--lib/warden-middleware/Cargo.toml1
-rw-r--r--lib/warden-middleware/src/trace_layer.rs3
-rw-r--r--proto/configuration/reload_event.proto7
23 files changed, 435 insertions, 75 deletions
diff --git a/.sqlx/query-1f6114577d8a5d358ad7a36b8e326e5ecf5fab54b0b145b31b5a644ecacb2b2e.json b/.sqlx/query-1f6114577d8a5d358ad7a36b8e326e5ecf5fab54b0b145b31b5a644ecacb2b2e.json
new file mode 100644
index 0000000..ea56d5a
--- /dev/null
+++ b/.sqlx/query-1f6114577d8a5d358ad7a36b8e326e5ecf5fab54b0b145b31b5a644ecacb2b2e.json
@@ -0,0 +1,15 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "insert into routing (id, configuration) values ($1, $2)",
+ "describe": {
+ "columns": [],
+ "parameters": {
+ "Left": [
+ "Uuid",
+ "Jsonb"
+ ]
+ },
+ "nullable": []
+ },
+ "hash": "1f6114577d8a5d358ad7a36b8e326e5ecf5fab54b0b145b31b5a644ecacb2b2e"
+}
diff --git a/.sqlx/query-2a92509e9bdb7c3a7d16cd87adee04048afed140e124c021b1578b3f5969c3f7.json b/.sqlx/query-2a92509e9bdb7c3a7d16cd87adee04048afed140e124c021b1578b3f5969c3f7.json
new file mode 100644
index 0000000..1eeeb3b
--- /dev/null
+++ b/.sqlx/query-2a92509e9bdb7c3a7d16cd87adee04048afed140e124c021b1578b3f5969c3f7.json
@@ -0,0 +1,26 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "select id, configuration as \"configuration: sqlx::types::Json<RoutingConfiguration>\" from routing where\n configuration->>'active' = 'true'",
+ "describe": {
+ "columns": [
+ {
+ "ordinal": 0,
+ "name": "id",
+ "type_info": "Uuid"
+ },
+ {
+ "ordinal": 1,
+ "name": "configuration: sqlx::types::Json<RoutingConfiguration>",
+ "type_info": "Jsonb"
+ }
+ ],
+ "parameters": {
+ "Left": []
+ },
+ "nullable": [
+ false,
+ false
+ ]
+ },
+ "hash": "2a92509e9bdb7c3a7d16cd87adee04048afed140e124c021b1578b3f5969c3f7"
+}
diff --git a/.sqlx/query-4fd63e6ec0ff3d61d790de01066341b25223bd39f1a51cc1abb60149e4a8092b.json b/.sqlx/query-4fd63e6ec0ff3d61d790de01066341b25223bd39f1a51cc1abb60149e4a8092b.json
new file mode 100644
index 0000000..e346609
--- /dev/null
+++ b/.sqlx/query-4fd63e6ec0ff3d61d790de01066341b25223bd39f1a51cc1abb60149e4a8092b.json
@@ -0,0 +1,28 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "\n delete from routing\n where id = $1\n returning id, configuration as \"configuration: sqlx::types::Json<RoutingConfiguration>\"\n ",
+ "describe": {
+ "columns": [
+ {
+ "ordinal": 0,
+ "name": "id",
+ "type_info": "Uuid"
+ },
+ {
+ "ordinal": 1,
+ "name": "configuration: sqlx::types::Json<RoutingConfiguration>",
+ "type_info": "Jsonb"
+ }
+ ],
+ "parameters": {
+ "Left": [
+ "Uuid"
+ ]
+ },
+ "nullable": [
+ false,
+ false
+ ]
+ },
+ "hash": "4fd63e6ec0ff3d61d790de01066341b25223bd39f1a51cc1abb60149e4a8092b"
+}
diff --git a/.sqlx/query-5f721c625db72ec257080ff3f35fcd1bffbafaa3085f56d3ec1dc39704ac3935.json b/.sqlx/query-5f721c625db72ec257080ff3f35fcd1bffbafaa3085f56d3ec1dc39704ac3935.json
new file mode 100644
index 0000000..3bfa58a
--- /dev/null
+++ b/.sqlx/query-5f721c625db72ec257080ff3f35fcd1bffbafaa3085f56d3ec1dc39704ac3935.json
@@ -0,0 +1,29 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "\n update routing\n set configuration = $1\n where id = $2\n returning id, configuration as \"configuration: sqlx::types::Json<RoutingConfiguration>\"\n ",
+ "describe": {
+ "columns": [
+ {
+ "ordinal": 0,
+ "name": "id",
+ "type_info": "Uuid"
+ },
+ {
+ "ordinal": 1,
+ "name": "configuration: sqlx::types::Json<RoutingConfiguration>",
+ "type_info": "Jsonb"
+ }
+ ],
+ "parameters": {
+ "Left": [
+ "Jsonb",
+ "Uuid"
+ ]
+ },
+ "nullable": [
+ false,
+ false
+ ]
+ },
+ "hash": "5f721c625db72ec257080ff3f35fcd1bffbafaa3085f56d3ec1dc39704ac3935"
+}
diff --git a/Cargo.lock b/Cargo.lock
index 7e65507..cdeea13 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3844,6 +3844,7 @@ dependencies = [
"config",
"metrics",
"metrics-exporter-prometheus",
+ "opentelemetry",
"opentelemetry-semantic-conventions",
"prost 0.14.1",
"serde",
@@ -3854,6 +3855,7 @@ dependencies = [
"tonic 0.14.0",
"tonic-reflection",
"tower",
+ "tower-http",
"tracing",
"tracing-opentelemetry",
"utoipa",
diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml
index f9b722f..b290f08 100644
--- a/crates/configuration/Cargo.toml
+++ b/crates/configuration/Cargo.toml
@@ -10,11 +10,12 @@ description.workspace = true
[dependencies]
anyhow.workspace = true
async-nats.workspace = true
-axum.workspace = true
+axum = { workspace = true, features = ["macros"] }
clap = { workspace = true, features = ["derive"] }
config = { workspace = true, features = ["convert-case", "toml"] }
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
+opentelemetry.workspace = true
opentelemetry-semantic-conventions.workspace = true
prost.workspace = true
serde = { workspace = true, features = ["derive"] }
@@ -26,12 +27,14 @@ sqlx = { workspace = true, features = [
"runtime-tokio",
"time",
"tls-rustls",
+ "uuid",
] }
time.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
tonic.workspace = true
tonic-reflection.workspace = true
tower = { workspace = true, features = ["steer"] }
+tower-http = { workspace = true, features = ["trace"] }
tracing.workspace = true
tracing-opentelemetry.workspace = true
utoipa = { workspace = true, features = ["axum_extras"] }
@@ -40,8 +43,8 @@ utoipa-rapidoc = { workspace = true, optional = true }
utoipa-redoc = { workspace = true, optional = true }
utoipa-scalar = { workspace = true, optional = true }
utoipa-swagger-ui = { workspace = true, optional = true }
-uuid = { workspace = true, features = ["serde"] }
-warden-core = { workspace = true, features = ["configuration", "serde-time"] }
+uuid = { workspace = true, features = ["serde", "v7"] }
+warden-core = { workspace = true, features = ["configuration", "openapi", "serde-time"] }
warden-middleware.workspace = true
[features]
diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs
index 7eb5e3b..7dc8da6 100644
--- a/crates/configuration/src/main.rs
+++ b/crates/configuration/src/main.rs
@@ -2,9 +2,13 @@ mod cnfg;
mod server;
mod state;
+use std::net::{Ipv6Addr, SocketAddr};
+
+use crate::{server::error::AppError, state::AppState};
+use axum::http::header::CONTENT_TYPE;
use clap::Parser;
-use tracing::error;
-use warden_config::state::AppState;
+use tower::{make::Shared, steer::Steer};
+use tracing::{error, info};
use warden_stack::{Configuration, Services, tracing::Tracing};
/// warden-config
@@ -17,7 +21,7 @@ struct Args {
}
#[tokio::main]
-async fn main() -> anyhow::Result<()> {
+async fn main() -> Result<(), AppError> {
let args = Args::parse();
let config = include_str!("../warden-config.toml");
@@ -41,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
tokio::spawn(tracing.loki_task);
- let services = Services::builder()
+ let mut services = Services::builder()
.postgres(&config.database)
.await
.inspect_err(|e| error!("database: {e}"))?
@@ -68,9 +72,43 @@ async fn main() -> anyhow::Result<()> {
.take()
.ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
- let state = AppState::new(services, config, Some(provider))
- .await
- .inspect_err(|e| error!("{e}"))?;
+ let state = AppState::create(
+ crate::state::Services {
+ postgres,
+ cache,
+ jetstream,
+ },
+ &config,
+ )
+ .await?;
+
+ let (app, grpc_server) = server::serve(state)?;
+
+ let service = Steer::new(
+ vec![app, grpc_server],
+ |req: &axum::extract::Request, _services: &[_]| {
+ if req
+ .headers()
+ .get(CONTENT_TYPE)
+ .map(|content_type| content_type.as_bytes())
+ .filter(|content_type| content_type.starts_with(b"application/grpc"))
+ .is_some()
+ {
+ // grpc service
+ 1
+ } else {
+ // http service
+ 0
+ }
+ },
+ );
+
+ let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port));
+
+ let listener = tokio::net::TcpListener::bind(addr).await?;
+ info!(port = addr.port(), "starting config-api");
+
+ axum::serve(listener, Shared::new(service)).await?;
- server::serve(state);
+ Ok(())
}
diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs
index 1131144..dec0813 100644
--- a/crates/configuration/src/server.rs
+++ b/crates/configuration/src/server.rs
@@ -1,13 +1,13 @@
-mod error;
+pub mod error;
+pub mod grpc_svc;
mod http_svc;
-mod interceptor;
+pub mod reload_stream;
mod version;
-use axum::http::header::CONTENT_TYPE;
+use grpc_svc::interceptor::MyInterceptor;
use http_svc::build_router;
-use interceptor::MyInterceptor;
use tonic::service::Routes;
-use tower::{make::Shared, steer::Steer};
+use tower_http::trace::TraceLayer;
use warden_core::{
FILE_DESCRIPTOR_SET,
configuration::routing::{
@@ -15,9 +15,9 @@ use warden_core::{
},
};
-use crate::state::AppHandle;
+use crate::{server::error::AppError, state::AppHandle};
-pub async fn serve<S>(state: AppHandle) -> anyhow::Result<Shared<S>> {
+pub fn serve(state: AppHandle) -> Result<(axum::Router, axum::Router), AppError> {
let app = build_router(state.clone());
let service = QueryRoutingServer::with_interceptor(state.clone(), MyInterceptor);
@@ -32,26 +32,14 @@ pub async fn serve<S>(state: AppHandle) -> anyhow::Result<Shared<S>> {
MyInterceptor,
))
.add_service(routing_reflector)
- .into_axum_router();
-
- let service = Steer::new(
- vec![app, grpc_server],
- |req: &axum::extract::Request, _services: &[_]| {
- if req
- .headers()
- .get(CONTENT_TYPE)
- .map(|content_type| content_type.as_bytes())
- .filter(|content_type| content_type.starts_with(b"application/grpc"))
- .is_some()
- {
- // grpc service
- 1
- } else {
- // http service
- 0
- }
- },
- );
-
- Ok(Shared::new(service))
+ .into_axum_router()
+ .layer(
+ TraceLayer::new_for_grpc().make_span_with(|request: &axum::http::Request<_>| {
+ tracing::trace_span!(env!("CARGO_PKG_NAME"), "otel.kind" = "server",
+ headers = ?request.headers()
+ )
+ }),
+ );
+
+ Ok((app, grpc_server))
}
diff --git a/crates/configuration/src/server/grpc_svc.rs b/crates/configuration/src/server/grpc_svc.rs
new file mode 100644
index 0000000..42aa871
--- /dev/null
+++ b/crates/configuration/src/server/grpc_svc.rs
@@ -0,0 +1,24 @@
+pub mod interceptor {
+ use opentelemetry::global;
+ use tonic::{Status, service::Interceptor};
+ use tracing::Span;
+ use tracing_opentelemetry::OpenTelemetrySpanExt;
+ use warden_stack::tracing::telemetry::tonic::extractor;
+
+ #[derive(Clone, Copy)]
+ pub struct MyInterceptor;
+
+ impl Interceptor for MyInterceptor {
+ fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
+ let span = Span::current();
+
+ let cx = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&extractor::MetadataMap(request.metadata()))
+ });
+
+ span.set_parent(cx);
+
+ Ok(request)
+ }
+ }
+}
diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs
index 7b2a258..45cdcfa 100644
--- a/crates/configuration/src/server/http_svc.rs
+++ b/crates/configuration/src/server/http_svc.rs
@@ -8,7 +8,7 @@ use utoipa_redoc::Servable;
#[cfg(feature = "scalar")]
use utoipa_scalar::Servable as _;
-use crate::{server::http_svc, state::AppHandle};
+use crate::state::AppHandle;
const TAG_ROUTING: &str = "Routing";
diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs
index cc065e8..e70dccd 100644
--- a/crates/configuration/src/server/http_svc/routes.rs
+++ b/crates/configuration/src/server/http_svc/routes.rs
@@ -6,6 +6,6 @@ use crate::state::AppHandle;
pub fn router(store: AppHandle) -> OpenApiRouter {
OpenApiRouter::new()
- .routes(routes!(routing::get_active))
+ .routes(routes!(routing::get_active::active_routing))
.with_state(store)
}
diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
index 562a1f1..9dc22e3 100644
--- a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
+++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
@@ -1,5 +1,9 @@
use axum::{extract::State, response::IntoResponse};
-use warden_core::configuration::routing::RoutingConfiguration;
+use tonic::IntoRequest;
+use warden_core::{
+ configuration::routing::{RoutingConfiguration, query_routing_server::QueryRouting},
+ google,
+};
use crate::{
server::{error::AppError, http_svc::TAG_ROUTING, version::Version},
@@ -22,9 +26,13 @@ use crate::{
]
#[axum::debug_handler]
#[tracing::instrument(skip(state), err(Debug), fields(method = "GET"))]
-pub(super) async fn active_routing(
+pub async fn active_routing(
version: Version,
State(state): State<AppHandle>,
) -> Result<impl IntoResponse, AppError> {
- Ok(String::default().into_response())
+ let config = state
+ .get_active_routing_configuration(google::protobuf::Empty::default().into_request())
+ .await?
+ .into_inner();
+ Ok(axum::Json(config.configuration).into_response())
}
diff --git a/crates/configuration/src/server/interceptor.rs b/crates/configuration/src/server/interceptor.rs
deleted file mode 100644
index eeb36c2..0000000
--- a/crates/configuration/src/server/interceptor.rs
+++ /dev/null
@@ -1,23 +0,0 @@
-use tonic::{Status, service::Interceptor};
-use tracing::Span;
-use warden_stack::{
- opentelemetry::global, tracing::telemetry::tonic::extractor,
- tracing_opentelemetry::OpenTelemetrySpanExt,
-};
-
-#[derive(Clone, Copy)]
-pub struct MyInterceptor;
-
-impl Interceptor for MyInterceptor {
- fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
- let span = Span::current();
-
- let cx = global::get_text_map_propagator(|propagator| {
- propagator.extract(&extractor::MetadataMap(request.metadata()))
- });
-
- span.set_parent(cx);
-
- Ok(request)
- }
-}
diff --git a/crates/configuration/src/server/reload_stream.rs b/crates/configuration/src/server/reload_stream.rs
new file mode 100644
index 0000000..d2ee4ab
--- /dev/null
+++ b/crates/configuration/src/server/reload_stream.rs
@@ -0,0 +1,21 @@
+use async_nats::jetstream::Context;
+use tracing::{debug, info};
+
+use crate::cnfg::JetstreamConfig;
+
+pub async fn create_stream(jetstream: &Context, config: &JetstreamConfig) -> anyhow::Result<()> {
+ debug!(name = ?config.stream, "initialising stream");
+
+ jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name: config.stream.to_string(),
+ max_messages: config.max_messages,
+ subjects: vec![config.subject.to_string()],
+ ..Default::default()
+ })
+ .await?;
+
+ info!(name = ?config.stream, subject = ?config.subject, "stream is ready");
+
+ Ok(())
+}
diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs
index 7672891..5a51d5b 100644
--- a/crates/configuration/src/state.rs
+++ b/crates/configuration/src/state.rs
@@ -4,13 +4,14 @@ mod routing;
use async_nats::jetstream::Context;
use sqlx::PgPool;
use std::{ops::Deref, sync::Arc};
-use tonic::transport::Endpoint;
-use tracing::error;
-use warden_stack::{Configuration, cache::RedisManager};
+use tracing::{instrument, trace};
+use warden_core::configuration::ReloadEvent;
+use warden_stack::{Configuration, cache::RedisManager, redis::AsyncCommands};
use crate::{
cnfg::LocalConfig,
- server::grpc::interceptor::{Intercepted, MyInterceptor},
+ server::{error::AppError, reload_stream::create_stream},
+ state::cache_key::CacheKey,
};
#[derive(Clone)]
@@ -43,9 +44,44 @@ impl AppState {
) -> Result<AppHandle, AppError> {
let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?;
+ create_stream(&services.jetstream, &local_config.nats).await?;
+
Ok(AppHandle(Arc::new(Self {
services,
app_config: local_config,
})))
}
}
+
+#[instrument(skip(state), err(Debug))]
+pub async fn invalidate_cache(state: &AppHandle, key: CacheKey<'_>) -> Result<(), tonic::Status> {
+ trace!("invalidating cache");
+ let mut cache = state
+ .services
+ .cache
+ .get()
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ cache
+ .del::<_, ()>(key)
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))
+}
+
+#[instrument(skip(state), err(Debug))]
+pub async fn publish_reload(
+ state: &AppHandle,
+ prefix: &str,
+ event: ReloadEvent,
+) -> Result<(), tonic::Status> {
+ trace!("publishing reload event");
+ state
+ .services
+ .jetstream
+ .publish(format!("{prefix}.reload"), event.as_str_name().into())
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ Ok(())
+}
diff --git a/crates/configuration/src/state/routing.rs b/crates/configuration/src/state/routing.rs
index ea51c17..d71af04 100644
--- a/crates/configuration/src/state/routing.rs
+++ b/crates/configuration/src/state/routing.rs
@@ -1 +1,2 @@
-mod query;
+mod mutate_routing;
+mod query_routing;
diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs
new file mode 100644
index 0000000..0c0637a
--- /dev/null
+++ b/crates/configuration/src/state/routing/mutate_routing.rs
@@ -0,0 +1,150 @@
+use opentelemetry_semantic_conventions::attribute;
+use tonic::{Request, Response, Status, async_trait};
+use tracing::{Instrument, error, info_span, instrument};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use uuid::Uuid;
+use warden_core::configuration::{
+ ReloadEvent,
+ routing::{
+ DeleteConfigurationRequest, RoutingConfiguration, UpdateRoutingRequest,
+ mutate_routing_server::MutateRouting,
+ },
+};
+
+use crate::state::{AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload};
+
+#[allow(dead_code)]
+struct RoutingRow {
+ id: Uuid,
+ configuration: sqlx::types::Json<RoutingConfiguration>,
+}
+
+#[async_trait]
+impl MutateRouting for AppHandle {
+ #[instrument(skip(self, request))]
+ async fn create_routing_configuration(
+ &self,
+ request: Request<RoutingConfiguration>,
+ ) -> Result<Response<RoutingConfiguration>, Status> {
+ let request = request.into_inner();
+ let span = info_span!("create.configuration.routing");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+
+ sqlx::query!(
+ "insert into routing (id, configuration) values ($1, $2)",
+ Uuid::now_v7(),
+ sqlx::types::Json(&request) as _
+ )
+ .execute(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ Ok(tonic::Response::new(request))
+ }
+
+ async fn update_routing_configuration(
+ &self,
+ request: Request<UpdateRoutingRequest>,
+ ) -> Result<Response<RoutingConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("bad config");
+
+ let request = request.into_inner();
+ let id = Uuid::parse_str(&request.id)
+ .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?;
+
+ let config = request.configuration.expect("configuration to be provided");
+
+ let span = info_span!("update.configuration.routing");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "update");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+
+ let updated = sqlx::query_as!(
+ RoutingRow,
+ r#"
+ update routing
+ set configuration = $1
+ where id = $2
+ returning id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>"
+ "#,
+ sqlx::types::Json(&config) as _,
+ id
+ )
+ .fetch_one(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(self, CacheKey::Routing(&id)),
+ publish_reload(self, conf, ReloadEvent::Routing)
+ )?;
+
+ let res = updated.configuration.0;
+
+ Ok(Response::new(res))
+ }
+
+ async fn delete_routing_configuration(
+ &self,
+ request: Request<DeleteConfigurationRequest>,
+ ) -> Result<Response<RoutingConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("bad config");
+
+ let request = request.into_inner();
+ let id = Uuid::parse_str(&request.id)
+ .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?;
+
+ let span = info_span!("delete.configuration.routing");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "delete");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+
+ let updated = sqlx::query_as!(
+ RoutingRow,
+ r#"
+ delete from routing
+ where id = $1
+ returning id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>"
+ "#,
+ id
+ )
+ .fetch_one(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(self, CacheKey::Routing(&id)),
+ publish_reload(self, conf, ReloadEvent::Routing)
+ )?;
+
+ let res = updated.configuration.0;
+
+ Ok(Response::new(res))
+ }
+}
diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query_routing.rs
index 3c6814d..3c6814d 100644
--- a/crates/configuration/src/state/routing/query.rs
+++ b/crates/configuration/src/state/routing/query_routing.rs
diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs
index 5f1e898..3992cd8 100644
--- a/lib/warden-core/build.rs
+++ b/lib/warden-core/build.rs
@@ -20,7 +20,10 @@ impl Entity {
#[cfg(feature = "configuration")]
fn configuration_protos() -> Vec<&'static str> {
- vec!["proto/configuration/routing.proto"]
+ vec![
+ "proto/configuration/routing.proto",
+ "proto/configuration/reload_event.proto",
+ ]
}
#[cfg(feature = "pseudonyms")]
diff --git a/lib/warden-core/src/configuration.rs b/lib/warden-core/src/configuration.rs
index da589c2..b620914 100644
--- a/lib/warden-core/src/configuration.rs
+++ b/lib/warden-core/src/configuration.rs
@@ -1,3 +1,5 @@
+tonic::include_proto!("configuration");
+
pub mod routing {
tonic::include_proto!("configuration.routing");
}
diff --git a/lib/warden-middleware/Cargo.toml b/lib/warden-middleware/Cargo.toml
index b23e61e..97c2c88 100644
--- a/lib/warden-middleware/Cargo.toml
+++ b/lib/warden-middleware/Cargo.toml
@@ -14,5 +14,6 @@ metrics.workspace = true
metrics-exporter-prometheus.workspace = true
tower-http = { workspace = true, features = [
"request-id",
+ "trace",
] }
tracing.workspace = true
diff --git a/lib/warden-middleware/src/trace_layer.rs b/lib/warden-middleware/src/trace_layer.rs
index 5173e8d..5792c09 100644
--- a/lib/warden-middleware/src/trace_layer.rs
+++ b/lib/warden-middleware/src/trace_layer.rs
@@ -17,7 +17,8 @@ pub fn apply_trace_context_middleware<T: Clone + Send + Sync + 'static>(
info_span!(
"http_request",
request_id = ?request_id,
- headers = ?request.headers()
+ headers = ?request.headers(),
+ "otel.kind" = "server"
)
}),
)
diff --git a/proto/configuration/reload_event.proto b/proto/configuration/reload_event.proto
new file mode 100644
index 0000000..a77645f
--- /dev/null
+++ b/proto/configuration/reload_event.proto
@@ -0,0 +1,7 @@
+syntax = "proto3";
+
+package configuration;
+
+enum ReloadEvent {
+ ROUTING = 0;
+}