diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-12 14:00:28 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-12 14:00:28 +0200 |
commit | 1d347dd2142a266552812ac2f8844acf52d2dc1c (patch) | |
tree | 70beaeb0bf572b9ef7323cc98b7b52084011d2d8 | |
parent | c5ea875f544824b0c042bf7c0a58b3134f9c0373 (diff) | |
download | warden-1d347dd2142a266552812ac2f8844acf52d2dc1c.tar.bz2 warden-1d347dd2142a266552812ac2f8844acf52d2dc1c.zip |
feat(config): reload config
23 files changed, 435 insertions, 75 deletions
diff --git a/.sqlx/query-1f6114577d8a5d358ad7a36b8e326e5ecf5fab54b0b145b31b5a644ecacb2b2e.json b/.sqlx/query-1f6114577d8a5d358ad7a36b8e326e5ecf5fab54b0b145b31b5a644ecacb2b2e.json new file mode 100644 index 0000000..ea56d5a --- /dev/null +++ b/.sqlx/query-1f6114577d8a5d358ad7a36b8e326e5ecf5fab54b0b145b31b5a644ecacb2b2e.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "insert into routing (id, configuration) values ($1, $2)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Jsonb" + ] + }, + "nullable": [] + }, + "hash": "1f6114577d8a5d358ad7a36b8e326e5ecf5fab54b0b145b31b5a644ecacb2b2e" +} diff --git a/.sqlx/query-2a92509e9bdb7c3a7d16cd87adee04048afed140e124c021b1578b3f5969c3f7.json b/.sqlx/query-2a92509e9bdb7c3a7d16cd87adee04048afed140e124c021b1578b3f5969c3f7.json new file mode 100644 index 0000000..1eeeb3b --- /dev/null +++ b/.sqlx/query-2a92509e9bdb7c3a7d16cd87adee04048afed140e124c021b1578b3f5969c3f7.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "select id, configuration as \"configuration: sqlx::types::Json<RoutingConfiguration>\" from routing where\n configuration->>'active' = 'true'", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "configuration: sqlx::types::Json<RoutingConfiguration>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "2a92509e9bdb7c3a7d16cd87adee04048afed140e124c021b1578b3f5969c3f7" +} diff --git a/.sqlx/query-4fd63e6ec0ff3d61d790de01066341b25223bd39f1a51cc1abb60149e4a8092b.json b/.sqlx/query-4fd63e6ec0ff3d61d790de01066341b25223bd39f1a51cc1abb60149e4a8092b.json new file mode 100644 index 0000000..e346609 --- /dev/null +++ b/.sqlx/query-4fd63e6ec0ff3d61d790de01066341b25223bd39f1a51cc1abb60149e4a8092b.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n delete from routing\n where id = $1\n returning id, configuration as \"configuration: sqlx::types::Json<RoutingConfiguration>\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "configuration: sqlx::types::Json<RoutingConfiguration>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "4fd63e6ec0ff3d61d790de01066341b25223bd39f1a51cc1abb60149e4a8092b" +} diff --git a/.sqlx/query-5f721c625db72ec257080ff3f35fcd1bffbafaa3085f56d3ec1dc39704ac3935.json b/.sqlx/query-5f721c625db72ec257080ff3f35fcd1bffbafaa3085f56d3ec1dc39704ac3935.json new file mode 100644 index 0000000..3bfa58a --- /dev/null +++ b/.sqlx/query-5f721c625db72ec257080ff3f35fcd1bffbafaa3085f56d3ec1dc39704ac3935.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "\n update routing\n set configuration = $1\n where id = $2\n returning id, configuration as \"configuration: sqlx::types::Json<RoutingConfiguration>\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "configuration: sqlx::types::Json<RoutingConfiguration>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Jsonb", + "Uuid" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "5f721c625db72ec257080ff3f35fcd1bffbafaa3085f56d3ec1dc39704ac3935" +} @@ -3844,6 +3844,7 @@ dependencies = [ "config", "metrics", "metrics-exporter-prometheus", + "opentelemetry", "opentelemetry-semantic-conventions", "prost 0.14.1", "serde", @@ -3854,6 +3855,7 @@ dependencies = [ "tonic 0.14.0", "tonic-reflection", "tower", + "tower-http", "tracing", "tracing-opentelemetry", "utoipa", diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index f9b722f..b290f08 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -10,11 +10,12 @@ description.workspace = true [dependencies] anyhow.workspace = true async-nats.workspace = true -axum.workspace = true +axum = { workspace = true, features = ["macros"] } clap = { workspace = true, features = ["derive"] } config = { workspace = true, features = ["convert-case", "toml"] } metrics.workspace = true metrics-exporter-prometheus.workspace = true +opentelemetry.workspace = true opentelemetry-semantic-conventions.workspace = true prost.workspace = true serde = { workspace = true, features = ["derive"] } @@ -26,12 +27,14 @@ sqlx = { workspace = true, features = [ "runtime-tokio", "time", "tls-rustls", + "uuid", ] } time.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tonic.workspace = true tonic-reflection.workspace = true tower = { workspace = true, features = ["steer"] } +tower-http = { workspace = true, features = ["trace"] } tracing.workspace = true tracing-opentelemetry.workspace = true utoipa = { workspace = true, features = ["axum_extras"] } @@ -40,8 +43,8 @@ utoipa-rapidoc = { workspace = true, optional = true } utoipa-redoc = { workspace = true, optional = true } utoipa-scalar = { workspace = true, optional = true } utoipa-swagger-ui = { workspace = true, optional = true } -uuid = { workspace = true, features = ["serde"] } -warden-core = { workspace = true, features = ["configuration", "serde-time"] } +uuid = { workspace = true, features = ["serde", "v7"] } +warden-core = { workspace = true, features = ["configuration", "openapi", "serde-time"] } warden-middleware.workspace = true [features] diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs index 7eb5e3b..7dc8da6 100644 --- a/crates/configuration/src/main.rs +++ b/crates/configuration/src/main.rs @@ -2,9 +2,13 @@ mod cnfg; mod server; mod state; +use std::net::{Ipv6Addr, SocketAddr}; + +use crate::{server::error::AppError, state::AppState}; +use axum::http::header::CONTENT_TYPE; use clap::Parser; -use tracing::error; -use warden_config::state::AppState; +use tower::{make::Shared, steer::Steer}; +use tracing::{error, info}; use warden_stack::{Configuration, Services, tracing::Tracing}; /// warden-config @@ -17,7 +21,7 @@ struct Args { } #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<(), AppError> { let args = Args::parse(); let config = include_str!("../warden-config.toml"); @@ -41,7 +45,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(tracing.loki_task); - let services = Services::builder() + let mut services = Services::builder() .postgres(&config.database) .await .inspect_err(|e| error!("database: {e}"))? @@ -68,9 +72,43 @@ async fn main() -> anyhow::Result<()> { .take() .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; - let state = AppState::new(services, config, Some(provider)) - .await - .inspect_err(|e| error!("{e}"))?; + let state = AppState::create( + crate::state::Services { + postgres, + cache, + jetstream, + }, + &config, + ) + .await?; + + let (app, grpc_server) = server::serve(state)?; + + let service = Steer::new( + vec![app, grpc_server], + |req: &axum::extract::Request, _services: &[_]| { + if req + .headers() + .get(CONTENT_TYPE) + .map(|content_type| content_type.as_bytes()) + .filter(|content_type| content_type.starts_with(b"application/grpc")) + .is_some() + { + // grpc service + 1 + } else { + // http service + 0 + } + }, + ); + + let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port)); + + let listener = tokio::net::TcpListener::bind(addr).await?; + info!(port = addr.port(), "starting config-api"); + + axum::serve(listener, Shared::new(service)).await?; - server::serve(state); + Ok(()) } diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs index 1131144..dec0813 100644 --- a/crates/configuration/src/server.rs +++ b/crates/configuration/src/server.rs @@ -1,13 +1,13 @@ -mod error; +pub mod error; +pub mod grpc_svc; mod http_svc; -mod interceptor; +pub mod reload_stream; mod version; -use axum::http::header::CONTENT_TYPE; +use grpc_svc::interceptor::MyInterceptor; use http_svc::build_router; -use interceptor::MyInterceptor; use tonic::service::Routes; -use tower::{make::Shared, steer::Steer}; +use tower_http::trace::TraceLayer; use warden_core::{ FILE_DESCRIPTOR_SET, configuration::routing::{ @@ -15,9 +15,9 @@ use warden_core::{ }, }; -use crate::state::AppHandle; +use crate::{server::error::AppError, state::AppHandle}; -pub async fn serve<S>(state: AppHandle) -> anyhow::Result<Shared<S>> { +pub fn serve(state: AppHandle) -> Result<(axum::Router, axum::Router), AppError> { let app = build_router(state.clone()); let service = QueryRoutingServer::with_interceptor(state.clone(), MyInterceptor); @@ -32,26 +32,14 @@ pub async fn serve<S>(state: AppHandle) -> anyhow::Result<Shared<S>> { MyInterceptor, )) .add_service(routing_reflector) - .into_axum_router(); - - let service = Steer::new( - vec![app, grpc_server], - |req: &axum::extract::Request, _services: &[_]| { - if req - .headers() - .get(CONTENT_TYPE) - .map(|content_type| content_type.as_bytes()) - .filter(|content_type| content_type.starts_with(b"application/grpc")) - .is_some() - { - // grpc service - 1 - } else { - // http service - 0 - } - }, - ); - - Ok(Shared::new(service)) + .into_axum_router() + .layer( + TraceLayer::new_for_grpc().make_span_with(|request: &axum::http::Request<_>| { + tracing::trace_span!(env!("CARGO_PKG_NAME"), "otel.kind" = "server", + headers = ?request.headers() + ) + }), + ); + + Ok((app, grpc_server)) } diff --git a/crates/configuration/src/server/grpc_svc.rs b/crates/configuration/src/server/grpc_svc.rs new file mode 100644 index 0000000..42aa871 --- /dev/null +++ b/crates/configuration/src/server/grpc_svc.rs @@ -0,0 +1,24 @@ +pub mod interceptor { + use opentelemetry::global; + use tonic::{Status, service::Interceptor}; + use tracing::Span; + use tracing_opentelemetry::OpenTelemetrySpanExt; + use warden_stack::tracing::telemetry::tonic::extractor; + + #[derive(Clone, Copy)] + pub struct MyInterceptor; + + impl Interceptor for MyInterceptor { + fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + let span = Span::current(); + + let cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::MetadataMap(request.metadata())) + }); + + span.set_parent(cx); + + Ok(request) + } + } +} diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs index 7b2a258..45cdcfa 100644 --- a/crates/configuration/src/server/http_svc.rs +++ b/crates/configuration/src/server/http_svc.rs @@ -8,7 +8,7 @@ use utoipa_redoc::Servable; #[cfg(feature = "scalar")] use utoipa_scalar::Servable as _; -use crate::{server::http_svc, state::AppHandle}; +use crate::state::AppHandle; const TAG_ROUTING: &str = "Routing"; diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs index cc065e8..e70dccd 100644 --- a/crates/configuration/src/server/http_svc/routes.rs +++ b/crates/configuration/src/server/http_svc/routes.rs @@ -6,6 +6,6 @@ use crate::state::AppHandle; pub fn router(store: AppHandle) -> OpenApiRouter { OpenApiRouter::new() - .routes(routes!(routing::get_active)) + .routes(routes!(routing::get_active::active_routing)) .with_state(store) } diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs index 562a1f1..9dc22e3 100644 --- a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs +++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs @@ -1,5 +1,9 @@ use axum::{extract::State, response::IntoResponse}; -use warden_core::configuration::routing::RoutingConfiguration; +use tonic::IntoRequest; +use warden_core::{ + configuration::routing::{RoutingConfiguration, query_routing_server::QueryRouting}, + google, +}; use crate::{ server::{error::AppError, http_svc::TAG_ROUTING, version::Version}, @@ -22,9 +26,13 @@ use crate::{ ] #[axum::debug_handler] #[tracing::instrument(skip(state), err(Debug), fields(method = "GET"))] -pub(super) async fn active_routing( +pub async fn active_routing( version: Version, State(state): State<AppHandle>, ) -> Result<impl IntoResponse, AppError> { - Ok(String::default().into_response()) + let config = state + .get_active_routing_configuration(google::protobuf::Empty::default().into_request()) + .await? + .into_inner(); + Ok(axum::Json(config.configuration).into_response()) } diff --git a/crates/configuration/src/server/interceptor.rs b/crates/configuration/src/server/interceptor.rs deleted file mode 100644 index eeb36c2..0000000 --- a/crates/configuration/src/server/interceptor.rs +++ /dev/null @@ -1,23 +0,0 @@ -use tonic::{Status, service::Interceptor}; -use tracing::Span; -use warden_stack::{ - opentelemetry::global, tracing::telemetry::tonic::extractor, - tracing_opentelemetry::OpenTelemetrySpanExt, -}; - -#[derive(Clone, Copy)] -pub struct MyInterceptor; - -impl Interceptor for MyInterceptor { - fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { - let span = Span::current(); - - let cx = global::get_text_map_propagator(|propagator| { - propagator.extract(&extractor::MetadataMap(request.metadata())) - }); - - span.set_parent(cx); - - Ok(request) - } -} diff --git a/crates/configuration/src/server/reload_stream.rs b/crates/configuration/src/server/reload_stream.rs new file mode 100644 index 0000000..d2ee4ab --- /dev/null +++ b/crates/configuration/src/server/reload_stream.rs @@ -0,0 +1,21 @@ +use async_nats::jetstream::Context; +use tracing::{debug, info}; + +use crate::cnfg::JetstreamConfig; + +pub async fn create_stream(jetstream: &Context, config: &JetstreamConfig) -> anyhow::Result<()> { + debug!(name = ?config.stream, "initialising stream"); + + jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: config.stream.to_string(), + max_messages: config.max_messages, + subjects: vec![config.subject.to_string()], + ..Default::default() + }) + .await?; + + info!(name = ?config.stream, subject = ?config.subject, "stream is ready"); + + Ok(()) +} diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs index 7672891..5a51d5b 100644 --- a/crates/configuration/src/state.rs +++ b/crates/configuration/src/state.rs @@ -4,13 +4,14 @@ mod routing; use async_nats::jetstream::Context; use sqlx::PgPool; use std::{ops::Deref, sync::Arc}; -use tonic::transport::Endpoint; -use tracing::error; -use warden_stack::{Configuration, cache::RedisManager}; +use tracing::{instrument, trace}; +use warden_core::configuration::ReloadEvent; +use warden_stack::{Configuration, cache::RedisManager, redis::AsyncCommands}; use crate::{ cnfg::LocalConfig, - server::grpc::interceptor::{Intercepted, MyInterceptor}, + server::{error::AppError, reload_stream::create_stream}, + state::cache_key::CacheKey, }; #[derive(Clone)] @@ -43,9 +44,44 @@ impl AppState { ) -> Result<AppHandle, AppError> { let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + create_stream(&services.jetstream, &local_config.nats).await?; + Ok(AppHandle(Arc::new(Self { services, app_config: local_config, }))) } } + +#[instrument(skip(state), err(Debug))] +pub async fn invalidate_cache(state: &AppHandle, key: CacheKey<'_>) -> Result<(), tonic::Status> { + trace!("invalidating cache"); + let mut cache = state + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + cache + .del::<_, ()>(key) + .await + .map_err(|e| tonic::Status::internal(e.to_string())) +} + +#[instrument(skip(state), err(Debug))] +pub async fn publish_reload( + state: &AppHandle, + prefix: &str, + event: ReloadEvent, +) -> Result<(), tonic::Status> { + trace!("publishing reload event"); + state + .services + .jetstream + .publish(format!("{prefix}.reload"), event.as_str_name().into()) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + Ok(()) +} diff --git a/crates/configuration/src/state/routing.rs b/crates/configuration/src/state/routing.rs index ea51c17..d71af04 100644 --- a/crates/configuration/src/state/routing.rs +++ b/crates/configuration/src/state/routing.rs @@ -1 +1,2 @@ -mod query; +mod mutate_routing; +mod query_routing; diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs new file mode 100644 index 0000000..0c0637a --- /dev/null +++ b/crates/configuration/src/state/routing/mutate_routing.rs @@ -0,0 +1,150 @@ +use opentelemetry_semantic_conventions::attribute; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{Instrument, error, info_span, instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +use warden_core::configuration::{ + ReloadEvent, + routing::{ + DeleteConfigurationRequest, RoutingConfiguration, UpdateRoutingRequest, + mutate_routing_server::MutateRouting, + }, +}; + +use crate::state::{AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload}; + +#[allow(dead_code)] +struct RoutingRow { + id: Uuid, + configuration: sqlx::types::Json<RoutingConfiguration>, +} + +#[async_trait] +impl MutateRouting for AppHandle { + #[instrument(skip(self, request))] + async fn create_routing_configuration( + &self, + request: Request<RoutingConfiguration>, + ) -> Result<Response<RoutingConfiguration>, Status> { + let request = request.into_inner(); + let span = info_span!("create.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + sqlx::query!( + "insert into routing (id, configuration) values ($1, $2)", + Uuid::now_v7(), + sqlx::types::Json(&request) as _ + ) + .execute(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + Ok(tonic::Response::new(request)) + } + + async fn update_routing_configuration( + &self, + request: Request<UpdateRoutingRequest>, + ) -> Result<Response<RoutingConfiguration>, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + let id = Uuid::parse_str(&request.id) + .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?; + + let config = request.configuration.expect("configuration to be provided"); + + let span = info_span!("update.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "update"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + let updated = sqlx::query_as!( + RoutingRow, + r#" + update routing + set configuration = $1 + where id = $2 + returning id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>" + "#, + sqlx::types::Json(&config) as _, + id + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache(self, CacheKey::Routing(&id)), + publish_reload(self, conf, ReloadEvent::Routing) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } + + async fn delete_routing_configuration( + &self, + request: Request<DeleteConfigurationRequest>, + ) -> Result<Response<RoutingConfiguration>, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + let id = Uuid::parse_str(&request.id) + .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?; + + let span = info_span!("delete.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "delete"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + let updated = sqlx::query_as!( + RoutingRow, + r#" + delete from routing + where id = $1 + returning id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>" + "#, + id + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache(self, CacheKey::Routing(&id)), + publish_reload(self, conf, ReloadEvent::Routing) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } +} diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query_routing.rs index 3c6814d..3c6814d 100644 --- a/crates/configuration/src/state/routing/query.rs +++ b/crates/configuration/src/state/routing/query_routing.rs diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs index 5f1e898..3992cd8 100644 --- a/lib/warden-core/build.rs +++ b/lib/warden-core/build.rs @@ -20,7 +20,10 @@ impl Entity { #[cfg(feature = "configuration")] fn configuration_protos() -> Vec<&'static str> { - vec!["proto/configuration/routing.proto"] + vec![ + "proto/configuration/routing.proto", + "proto/configuration/reload_event.proto", + ] } #[cfg(feature = "pseudonyms")] diff --git a/lib/warden-core/src/configuration.rs b/lib/warden-core/src/configuration.rs index da589c2..b620914 100644 --- a/lib/warden-core/src/configuration.rs +++ b/lib/warden-core/src/configuration.rs @@ -1,3 +1,5 @@ +tonic::include_proto!("configuration"); + pub mod routing { tonic::include_proto!("configuration.routing"); } diff --git a/lib/warden-middleware/Cargo.toml b/lib/warden-middleware/Cargo.toml index b23e61e..97c2c88 100644 --- a/lib/warden-middleware/Cargo.toml +++ b/lib/warden-middleware/Cargo.toml @@ -14,5 +14,6 @@ metrics.workspace = true metrics-exporter-prometheus.workspace = true tower-http = { workspace = true, features = [ "request-id", + "trace", ] } tracing.workspace = true diff --git a/lib/warden-middleware/src/trace_layer.rs b/lib/warden-middleware/src/trace_layer.rs index 5173e8d..5792c09 100644 --- a/lib/warden-middleware/src/trace_layer.rs +++ b/lib/warden-middleware/src/trace_layer.rs @@ -17,7 +17,8 @@ pub fn apply_trace_context_middleware<T: Clone + Send + Sync + 'static>( info_span!( "http_request", request_id = ?request_id, - headers = ?request.headers() + headers = ?request.headers(), + "otel.kind" = "server" ) }), ) diff --git a/proto/configuration/reload_event.proto b/proto/configuration/reload_event.proto new file mode 100644 index 0000000..a77645f --- /dev/null +++ b/proto/configuration/reload_event.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package configuration; + +enum ReloadEvent { + ROUTING = 0; +} |