aboutsummaryrefslogtreecommitdiffstats
path: root/crates/configuration/src/server
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-12 14:00:28 +0200
committerrtkay123 <dev@kanjala.com>2025-08-12 14:00:28 +0200
commit1d347dd2142a266552812ac2f8844acf52d2dc1c (patch)
tree70beaeb0bf572b9ef7323cc98b7b52084011d2d8 /crates/configuration/src/server
parentc5ea875f544824b0c042bf7c0a58b3134f9c0373 (diff)
downloadwarden-1d347dd2142a266552812ac2f8844acf52d2dc1c.tar.bz2
warden-1d347dd2142a266552812ac2f8844acf52d2dc1c.zip
feat(config): reload config
Diffstat (limited to 'crates/configuration/src/server')
-rw-r--r--crates/configuration/src/server/grpc_svc.rs24
-rw-r--r--crates/configuration/src/server/http_svc.rs2
-rw-r--r--crates/configuration/src/server/http_svc/routes.rs2
-rw-r--r--crates/configuration/src/server/http_svc/routes/routing/get_active.rs14
-rw-r--r--crates/configuration/src/server/interceptor.rs23
-rw-r--r--crates/configuration/src/server/reload_stream.rs21
6 files changed, 58 insertions, 28 deletions
diff --git a/crates/configuration/src/server/grpc_svc.rs b/crates/configuration/src/server/grpc_svc.rs
new file mode 100644
index 0000000..42aa871
--- /dev/null
+++ b/crates/configuration/src/server/grpc_svc.rs
@@ -0,0 +1,24 @@
+pub mod interceptor {
+ use opentelemetry::global;
+ use tonic::{Status, service::Interceptor};
+ use tracing::Span;
+ use tracing_opentelemetry::OpenTelemetrySpanExt;
+ use warden_stack::tracing::telemetry::tonic::extractor;
+
+ #[derive(Clone, Copy)]
+ pub struct MyInterceptor;
+
+ impl Interceptor for MyInterceptor {
+ fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
+ let span = Span::current();
+
+ let cx = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&extractor::MetadataMap(request.metadata()))
+ });
+
+ span.set_parent(cx);
+
+ Ok(request)
+ }
+ }
+}
diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs
index 7b2a258..45cdcfa 100644
--- a/crates/configuration/src/server/http_svc.rs
+++ b/crates/configuration/src/server/http_svc.rs
@@ -8,7 +8,7 @@ use utoipa_redoc::Servable;
#[cfg(feature = "scalar")]
use utoipa_scalar::Servable as _;
-use crate::{server::http_svc, state::AppHandle};
+use crate::state::AppHandle;
const TAG_ROUTING: &str = "Routing";
diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs
index cc065e8..e70dccd 100644
--- a/crates/configuration/src/server/http_svc/routes.rs
+++ b/crates/configuration/src/server/http_svc/routes.rs
@@ -6,6 +6,6 @@ use crate::state::AppHandle;
pub fn router(store: AppHandle) -> OpenApiRouter {
OpenApiRouter::new()
- .routes(routes!(routing::get_active))
+ .routes(routes!(routing::get_active::active_routing))
.with_state(store)
}
diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
index 562a1f1..9dc22e3 100644
--- a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
+++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
@@ -1,5 +1,9 @@
use axum::{extract::State, response::IntoResponse};
-use warden_core::configuration::routing::RoutingConfiguration;
+use tonic::IntoRequest;
+use warden_core::{
+ configuration::routing::{RoutingConfiguration, query_routing_server::QueryRouting},
+ google,
+};
use crate::{
server::{error::AppError, http_svc::TAG_ROUTING, version::Version},
@@ -22,9 +26,13 @@ use crate::{
]
#[axum::debug_handler]
#[tracing::instrument(skip(state), err(Debug), fields(method = "GET"))]
-pub(super) async fn active_routing(
+pub async fn active_routing(
version: Version,
State(state): State<AppHandle>,
) -> Result<impl IntoResponse, AppError> {
- Ok(String::default().into_response())
+ let config = state
+ .get_active_routing_configuration(google::protobuf::Empty::default().into_request())
+ .await?
+ .into_inner();
+ Ok(axum::Json(config.configuration).into_response())
}
diff --git a/crates/configuration/src/server/interceptor.rs b/crates/configuration/src/server/interceptor.rs
deleted file mode 100644
index eeb36c2..0000000
--- a/crates/configuration/src/server/interceptor.rs
+++ /dev/null
@@ -1,23 +0,0 @@
-use tonic::{Status, service::Interceptor};
-use tracing::Span;
-use warden_stack::{
- opentelemetry::global, tracing::telemetry::tonic::extractor,
- tracing_opentelemetry::OpenTelemetrySpanExt,
-};
-
-#[derive(Clone, Copy)]
-pub struct MyInterceptor;
-
-impl Interceptor for MyInterceptor {
- fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
- let span = Span::current();
-
- let cx = global::get_text_map_propagator(|propagator| {
- propagator.extract(&extractor::MetadataMap(request.metadata()))
- });
-
- span.set_parent(cx);
-
- Ok(request)
- }
-}
diff --git a/crates/configuration/src/server/reload_stream.rs b/crates/configuration/src/server/reload_stream.rs
new file mode 100644
index 0000000..d2ee4ab
--- /dev/null
+++ b/crates/configuration/src/server/reload_stream.rs
@@ -0,0 +1,21 @@
+use async_nats::jetstream::Context;
+use tracing::{debug, info};
+
+use crate::cnfg::JetstreamConfig;
+
+pub async fn create_stream(jetstream: &Context, config: &JetstreamConfig) -> anyhow::Result<()> {
+ debug!(name = ?config.stream, "initialising stream");
+
+ jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name: config.stream.to_string(),
+ max_messages: config.max_messages,
+ subjects: vec![config.subject.to_string()],
+ ..Default::default()
+ })
+ .await?;
+
+ info!(name = ?config.stream, subject = ?config.subject, "stream is ready");
+
+ Ok(())
+}