diff options
Diffstat (limited to 'crates')
-rw-r--r-- | crates/configuration/Cargo.toml | 9 | ||||
-rw-r--r-- | crates/configuration/src/main.rs | 54 | ||||
-rw-r--r-- | crates/configuration/src/server.rs | 46 | ||||
-rw-r--r-- | crates/configuration/src/server/grpc_svc.rs | 24 | ||||
-rw-r--r-- | crates/configuration/src/server/http_svc.rs | 2 | ||||
-rw-r--r-- | crates/configuration/src/server/http_svc/routes.rs | 2 | ||||
-rw-r--r-- | crates/configuration/src/server/http_svc/routes/routing/get_active.rs | 14 | ||||
-rw-r--r-- | crates/configuration/src/server/interceptor.rs | 23 | ||||
-rw-r--r-- | crates/configuration/src/server/reload_stream.rs | 21 | ||||
-rw-r--r-- | crates/configuration/src/state.rs | 44 | ||||
-rw-r--r-- | crates/configuration/src/state/routing.rs | 3 | ||||
-rw-r--r-- | crates/configuration/src/state/routing/mutate_routing.rs | 150 | ||||
-rw-r--r-- | crates/configuration/src/state/routing/query_routing.rs (renamed from crates/configuration/src/state/routing/query.rs) | 0 |
13 files changed, 319 insertions, 73 deletions
diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index f9b722f..b290f08 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -10,11 +10,12 @@ description.workspace = true [dependencies] anyhow.workspace = true async-nats.workspace = true -axum.workspace = true +axum = { workspace = true, features = ["macros"] } clap = { workspace = true, features = ["derive"] } config = { workspace = true, features = ["convert-case", "toml"] } metrics.workspace = true metrics-exporter-prometheus.workspace = true +opentelemetry.workspace = true opentelemetry-semantic-conventions.workspace = true prost.workspace = true serde = { workspace = true, features = ["derive"] } @@ -26,12 +27,14 @@ sqlx = { workspace = true, features = [ "runtime-tokio", "time", "tls-rustls", + "uuid", ] } time.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tonic.workspace = true tonic-reflection.workspace = true tower = { workspace = true, features = ["steer"] } +tower-http = { workspace = true, features = ["trace"] } tracing.workspace = true tracing-opentelemetry.workspace = true utoipa = { workspace = true, features = ["axum_extras"] } @@ -40,8 +43,8 @@ utoipa-rapidoc = { workspace = true, optional = true } utoipa-redoc = { workspace = true, optional = true } utoipa-scalar = { workspace = true, optional = true } utoipa-swagger-ui = { workspace = true, optional = true } -uuid = { workspace = true, features = ["serde"] } -warden-core = { workspace = true, features = ["configuration", "serde-time"] } +uuid = { workspace = true, features = ["serde", "v7"] } +warden-core = { workspace = true, features = ["configuration", "openapi", "serde-time"] } warden-middleware.workspace = true [features] diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs index 7eb5e3b..7dc8da6 100644 --- a/crates/configuration/src/main.rs +++ b/crates/configuration/src/main.rs @@ -2,9 +2,13 @@ mod cnfg; mod server; mod state; +use std::net::{Ipv6Addr, SocketAddr}; + +use crate::{server::error::AppError, state::AppState}; +use axum::http::header::CONTENT_TYPE; use clap::Parser; -use tracing::error; -use warden_config::state::AppState; +use tower::{make::Shared, steer::Steer}; +use tracing::{error, info}; use warden_stack::{Configuration, Services, tracing::Tracing}; /// warden-config @@ -17,7 +21,7 @@ struct Args { } #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<(), AppError> { let args = Args::parse(); let config = include_str!("../warden-config.toml"); @@ -41,7 +45,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(tracing.loki_task); - let services = Services::builder() + let mut services = Services::builder() .postgres(&config.database) .await .inspect_err(|e| error!("database: {e}"))? @@ -68,9 +72,43 @@ async fn main() -> anyhow::Result<()> { .take() .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; - let state = AppState::new(services, config, Some(provider)) - .await - .inspect_err(|e| error!("{e}"))?; + let state = AppState::create( + crate::state::Services { + postgres, + cache, + jetstream, + }, + &config, + ) + .await?; + + let (app, grpc_server) = server::serve(state)?; + + let service = Steer::new( + vec![app, grpc_server], + |req: &axum::extract::Request, _services: &[_]| { + if req + .headers() + .get(CONTENT_TYPE) + .map(|content_type| content_type.as_bytes()) + .filter(|content_type| content_type.starts_with(b"application/grpc")) + .is_some() + { + // grpc service + 1 + } else { + // http service + 0 + } + }, + ); + + let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port)); + + let listener = tokio::net::TcpListener::bind(addr).await?; + info!(port = addr.port(), "starting config-api"); + + axum::serve(listener, Shared::new(service)).await?; - server::serve(state); + Ok(()) } diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs index 1131144..dec0813 100644 --- a/crates/configuration/src/server.rs +++ b/crates/configuration/src/server.rs @@ -1,13 +1,13 @@ -mod error; +pub mod error; +pub mod grpc_svc; mod http_svc; -mod interceptor; +pub mod reload_stream; mod version; -use axum::http::header::CONTENT_TYPE; +use grpc_svc::interceptor::MyInterceptor; use http_svc::build_router; -use interceptor::MyInterceptor; use tonic::service::Routes; -use tower::{make::Shared, steer::Steer}; +use tower_http::trace::TraceLayer; use warden_core::{ FILE_DESCRIPTOR_SET, configuration::routing::{ @@ -15,9 +15,9 @@ use warden_core::{ }, }; -use crate::state::AppHandle; +use crate::{server::error::AppError, state::AppHandle}; -pub async fn serve<S>(state: AppHandle) -> anyhow::Result<Shared<S>> { +pub fn serve(state: AppHandle) -> Result<(axum::Router, axum::Router), AppError> { let app = build_router(state.clone()); let service = QueryRoutingServer::with_interceptor(state.clone(), MyInterceptor); @@ -32,26 +32,14 @@ pub async fn serve<S>(state: AppHandle) -> anyhow::Result<Shared<S>> { MyInterceptor, )) .add_service(routing_reflector) - .into_axum_router(); - - let service = Steer::new( - vec![app, grpc_server], - |req: &axum::extract::Request, _services: &[_]| { - if req - .headers() - .get(CONTENT_TYPE) - .map(|content_type| content_type.as_bytes()) - .filter(|content_type| content_type.starts_with(b"application/grpc")) - .is_some() - { - // grpc service - 1 - } else { - // http service - 0 - } - }, - ); - - Ok(Shared::new(service)) + .into_axum_router() + .layer( + TraceLayer::new_for_grpc().make_span_with(|request: &axum::http::Request<_>| { + tracing::trace_span!(env!("CARGO_PKG_NAME"), "otel.kind" = "server", + headers = ?request.headers() + ) + }), + ); + + Ok((app, grpc_server)) } diff --git a/crates/configuration/src/server/grpc_svc.rs b/crates/configuration/src/server/grpc_svc.rs new file mode 100644 index 0000000..42aa871 --- /dev/null +++ b/crates/configuration/src/server/grpc_svc.rs @@ -0,0 +1,24 @@ +pub mod interceptor { + use opentelemetry::global; + use tonic::{Status, service::Interceptor}; + use tracing::Span; + use tracing_opentelemetry::OpenTelemetrySpanExt; + use warden_stack::tracing::telemetry::tonic::extractor; + + #[derive(Clone, Copy)] + pub struct MyInterceptor; + + impl Interceptor for MyInterceptor { + fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + let span = Span::current(); + + let cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::MetadataMap(request.metadata())) + }); + + span.set_parent(cx); + + Ok(request) + } + } +} diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs index 7b2a258..45cdcfa 100644 --- a/crates/configuration/src/server/http_svc.rs +++ b/crates/configuration/src/server/http_svc.rs @@ -8,7 +8,7 @@ use utoipa_redoc::Servable; #[cfg(feature = "scalar")] use utoipa_scalar::Servable as _; -use crate::{server::http_svc, state::AppHandle}; +use crate::state::AppHandle; const TAG_ROUTING: &str = "Routing"; diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs index cc065e8..e70dccd 100644 --- a/crates/configuration/src/server/http_svc/routes.rs +++ b/crates/configuration/src/server/http_svc/routes.rs @@ -6,6 +6,6 @@ use crate::state::AppHandle; pub fn router(store: AppHandle) -> OpenApiRouter { OpenApiRouter::new() - .routes(routes!(routing::get_active)) + .routes(routes!(routing::get_active::active_routing)) .with_state(store) } diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs index 562a1f1..9dc22e3 100644 --- a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs +++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs @@ -1,5 +1,9 @@ use axum::{extract::State, response::IntoResponse}; -use warden_core::configuration::routing::RoutingConfiguration; +use tonic::IntoRequest; +use warden_core::{ + configuration::routing::{RoutingConfiguration, query_routing_server::QueryRouting}, + google, +}; use crate::{ server::{error::AppError, http_svc::TAG_ROUTING, version::Version}, @@ -22,9 +26,13 @@ use crate::{ ] #[axum::debug_handler] #[tracing::instrument(skip(state), err(Debug), fields(method = "GET"))] -pub(super) async fn active_routing( +pub async fn active_routing( version: Version, State(state): State<AppHandle>, ) -> Result<impl IntoResponse, AppError> { - Ok(String::default().into_response()) + let config = state + .get_active_routing_configuration(google::protobuf::Empty::default().into_request()) + .await? + .into_inner(); + Ok(axum::Json(config.configuration).into_response()) } diff --git a/crates/configuration/src/server/interceptor.rs b/crates/configuration/src/server/interceptor.rs deleted file mode 100644 index eeb36c2..0000000 --- a/crates/configuration/src/server/interceptor.rs +++ /dev/null @@ -1,23 +0,0 @@ -use tonic::{Status, service::Interceptor}; -use tracing::Span; -use warden_stack::{ - opentelemetry::global, tracing::telemetry::tonic::extractor, - tracing_opentelemetry::OpenTelemetrySpanExt, -}; - -#[derive(Clone, Copy)] -pub struct MyInterceptor; - -impl Interceptor for MyInterceptor { - fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { - let span = Span::current(); - - let cx = global::get_text_map_propagator(|propagator| { - propagator.extract(&extractor::MetadataMap(request.metadata())) - }); - - span.set_parent(cx); - - Ok(request) - } -} diff --git a/crates/configuration/src/server/reload_stream.rs b/crates/configuration/src/server/reload_stream.rs new file mode 100644 index 0000000..d2ee4ab --- /dev/null +++ b/crates/configuration/src/server/reload_stream.rs @@ -0,0 +1,21 @@ +use async_nats::jetstream::Context; +use tracing::{debug, info}; + +use crate::cnfg::JetstreamConfig; + +pub async fn create_stream(jetstream: &Context, config: &JetstreamConfig) -> anyhow::Result<()> { + debug!(name = ?config.stream, "initialising stream"); + + jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: config.stream.to_string(), + max_messages: config.max_messages, + subjects: vec![config.subject.to_string()], + ..Default::default() + }) + .await?; + + info!(name = ?config.stream, subject = ?config.subject, "stream is ready"); + + Ok(()) +} diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs index 7672891..5a51d5b 100644 --- a/crates/configuration/src/state.rs +++ b/crates/configuration/src/state.rs @@ -4,13 +4,14 @@ mod routing; use async_nats::jetstream::Context; use sqlx::PgPool; use std::{ops::Deref, sync::Arc}; -use tonic::transport::Endpoint; -use tracing::error; -use warden_stack::{Configuration, cache::RedisManager}; +use tracing::{instrument, trace}; +use warden_core::configuration::ReloadEvent; +use warden_stack::{Configuration, cache::RedisManager, redis::AsyncCommands}; use crate::{ cnfg::LocalConfig, - server::grpc::interceptor::{Intercepted, MyInterceptor}, + server::{error::AppError, reload_stream::create_stream}, + state::cache_key::CacheKey, }; #[derive(Clone)] @@ -43,9 +44,44 @@ impl AppState { ) -> Result<AppHandle, AppError> { let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + create_stream(&services.jetstream, &local_config.nats).await?; + Ok(AppHandle(Arc::new(Self { services, app_config: local_config, }))) } } + +#[instrument(skip(state), err(Debug))] +pub async fn invalidate_cache(state: &AppHandle, key: CacheKey<'_>) -> Result<(), tonic::Status> { + trace!("invalidating cache"); + let mut cache = state + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + cache + .del::<_, ()>(key) + .await + .map_err(|e| tonic::Status::internal(e.to_string())) +} + +#[instrument(skip(state), err(Debug))] +pub async fn publish_reload( + state: &AppHandle, + prefix: &str, + event: ReloadEvent, +) -> Result<(), tonic::Status> { + trace!("publishing reload event"); + state + .services + .jetstream + .publish(format!("{prefix}.reload"), event.as_str_name().into()) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + Ok(()) +} diff --git a/crates/configuration/src/state/routing.rs b/crates/configuration/src/state/routing.rs index ea51c17..d71af04 100644 --- a/crates/configuration/src/state/routing.rs +++ b/crates/configuration/src/state/routing.rs @@ -1 +1,2 @@ -mod query; +mod mutate_routing; +mod query_routing; diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs new file mode 100644 index 0000000..0c0637a --- /dev/null +++ b/crates/configuration/src/state/routing/mutate_routing.rs @@ -0,0 +1,150 @@ +use opentelemetry_semantic_conventions::attribute; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{Instrument, error, info_span, instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +use warden_core::configuration::{ + ReloadEvent, + routing::{ + DeleteConfigurationRequest, RoutingConfiguration, UpdateRoutingRequest, + mutate_routing_server::MutateRouting, + }, +}; + +use crate::state::{AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload}; + +#[allow(dead_code)] +struct RoutingRow { + id: Uuid, + configuration: sqlx::types::Json<RoutingConfiguration>, +} + +#[async_trait] +impl MutateRouting for AppHandle { + #[instrument(skip(self, request))] + async fn create_routing_configuration( + &self, + request: Request<RoutingConfiguration>, + ) -> Result<Response<RoutingConfiguration>, Status> { + let request = request.into_inner(); + let span = info_span!("create.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + sqlx::query!( + "insert into routing (id, configuration) values ($1, $2)", + Uuid::now_v7(), + sqlx::types::Json(&request) as _ + ) + .execute(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + Ok(tonic::Response::new(request)) + } + + async fn update_routing_configuration( + &self, + request: Request<UpdateRoutingRequest>, + ) -> Result<Response<RoutingConfiguration>, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + let id = Uuid::parse_str(&request.id) + .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?; + + let config = request.configuration.expect("configuration to be provided"); + + let span = info_span!("update.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "update"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + let updated = sqlx::query_as!( + RoutingRow, + r#" + update routing + set configuration = $1 + where id = $2 + returning id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>" + "#, + sqlx::types::Json(&config) as _, + id + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache(self, CacheKey::Routing(&id)), + publish_reload(self, conf, ReloadEvent::Routing) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } + + async fn delete_routing_configuration( + &self, + request: Request<DeleteConfigurationRequest>, + ) -> Result<Response<RoutingConfiguration>, Status> { + let conf = self + .app_config + .nats + .subject + .split(".") + .next() + .expect("bad config"); + + let request = request.into_inner(); + let id = Uuid::parse_str(&request.id) + .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?; + + let span = info_span!("delete.configuration.routing"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "delete"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + + let updated = sqlx::query_as!( + RoutingRow, + r#" + delete from routing + where id = $1 + returning id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>" + "#, + id + ) + .fetch_one(&self.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + tonic::Status::internal(e.to_string()) + })?; + + let (_del_result, _publish_result) = tokio::try_join!( + invalidate_cache(self, CacheKey::Routing(&id)), + publish_reload(self, conf, ReloadEvent::Routing) + )?; + + let res = updated.configuration.0; + + Ok(Response::new(res)) + } +} diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query_routing.rs index 3c6814d..3c6814d 100644 --- a/crates/configuration/src/state/routing/query.rs +++ b/crates/configuration/src/state/routing/query_routing.rs |