diff options
Diffstat (limited to 'crates/configuration/src')
-rw-r--r-- | crates/configuration/src/cnfg.rs | 16 | ||||
-rw-r--r-- | crates/configuration/src/main.rs | 77 | ||||
-rw-r--r-- | crates/configuration/src/server.rs | 57 | ||||
-rw-r--r-- | crates/configuration/src/server/error.rs | 26 | ||||
-rw-r--r-- | crates/configuration/src/server/http_svc.rs | 63 | ||||
-rw-r--r-- | crates/configuration/src/server/http_svc/routes.rs | 11 | ||||
-rw-r--r-- | crates/configuration/src/server/http_svc/routes/routing.rs | 4 | ||||
-rw-r--r-- | crates/configuration/src/server/http_svc/routes/routing/get_active.rs | 30 | ||||
-rw-r--r-- | crates/configuration/src/server/interceptor.rs | 23 | ||||
-rw-r--r-- | crates/configuration/src/server/version.rs | 35 | ||||
-rw-r--r-- | crates/configuration/src/state.rs | 51 | ||||
-rw-r--r-- | crates/configuration/src/state/cache_key.rs | 21 | ||||
-rw-r--r-- | crates/configuration/src/state/routing.rs | 1 | ||||
-rw-r--r-- | crates/configuration/src/state/routing/query.rs | 106 |
14 files changed, 519 insertions, 2 deletions
diff --git a/crates/configuration/src/cnfg.rs b/crates/configuration/src/cnfg.rs new file mode 100644 index 0000000..a68b740 --- /dev/null +++ b/crates/configuration/src/cnfg.rs @@ -0,0 +1,16 @@ +use std::sync::Arc; + +use serde::Deserialize; + +#[derive(Deserialize, Clone)] +pub struct LocalConfig { + pub nats: JetstreamConfig, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct JetstreamConfig { + pub stream: Arc<str>, + pub max_messages: i64, + pub subject: Arc<str>, +} diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs index e7a11a9..7eb5e3b 100644 --- a/crates/configuration/src/main.rs +++ b/crates/configuration/src/main.rs @@ -1,3 +1,76 @@ -fn main() { - println!("Hello, world!"); +mod cnfg; +mod server; +mod state; + +use clap::Parser; +use tracing::error; +use warden_config::state::AppState; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +/// warden-config +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + let config = include_str!("../warden-config.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let services = Services::builder() + .postgres(&config.database) + .await + .inspect_err(|e| error!("database: {e}"))? + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .cache(&config.cache) + .await + .inspect_err(|e| error!("cache: {e}"))? + .build(); + + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let state = AppState::new(services, config, Some(provider)) + .await + .inspect_err(|e| error!("{e}"))?; + + server::serve(state); } diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs new file mode 100644 index 0000000..1131144 --- /dev/null +++ b/crates/configuration/src/server.rs @@ -0,0 +1,57 @@ +mod error; +mod http_svc; +mod interceptor; +mod version; + +use axum::http::header::CONTENT_TYPE; +use http_svc::build_router; +use interceptor::MyInterceptor; +use tonic::service::Routes; +use tower::{make::Shared, steer::Steer}; +use warden_core::{ + FILE_DESCRIPTOR_SET, + configuration::routing::{ + mutate_routing_server::MutateRoutingServer, query_routing_server::QueryRoutingServer, + }, +}; + +use crate::state::AppHandle; + +pub async fn serve<S>(state: AppHandle) -> anyhow::Result<Shared<S>> { + let app = build_router(state.clone()); + + let service = QueryRoutingServer::with_interceptor(state.clone(), MyInterceptor); + + let routing_reflector = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build_v1()?; + + let grpc_server = Routes::new(service) + .add_service(MutateRoutingServer::with_interceptor( + state.clone(), + MyInterceptor, + )) + .add_service(routing_reflector) + .into_axum_router(); + + let service = Steer::new( + vec![app, grpc_server], + |req: &axum::extract::Request, _services: &[_]| { + if req + .headers() + .get(CONTENT_TYPE) + .map(|content_type| content_type.as_bytes()) + .filter(|content_type| content_type.starts_with(b"application/grpc")) + .is_some() + { + // grpc service + 1 + } else { + // http service + 0 + } + }, + ); + + Ok(Shared::new(service)) +} diff --git a/crates/configuration/src/server/error.rs b/crates/configuration/src/server/error.rs new file mode 100644 index 0000000..730f99a --- /dev/null +++ b/crates/configuration/src/server/error.rs @@ -0,0 +1,26 @@ +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, +}; + +#[derive(Debug)] +pub struct AppError(anyhow::Error); + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Something went wrong: {}", self.0), + ) + .into_response() + } +} + +impl<E> From<E> for AppError +where + E: Into<anyhow::Error>, +{ + fn from(err: E) -> Self { + Self(err.into()) + } +} diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs new file mode 100644 index 0000000..7b2a258 --- /dev/null +++ b/crates/configuration/src/server/http_svc.rs @@ -0,0 +1,63 @@ +mod routes; + +use axum::{Router, response::IntoResponse}; +use utoipa::OpenApi; +use utoipa_axum::router::OpenApiRouter; +#[cfg(feature = "redoc")] +use utoipa_redoc::Servable; +#[cfg(feature = "scalar")] +use utoipa_scalar::Servable as _; + +use crate::{server::http_svc, state::AppHandle}; + +const TAG_ROUTING: &str = "Routing"; + +#[derive(OpenApi)] +#[openapi( + tags( + (name = TAG_ROUTING, description = "Operations related to routing configuration"), + ) +)] +pub struct ApiDoc; + +/// Get health of the API. +#[utoipa::path( + method(get), + path = "/", + responses( + (status = OK, description = "Success", body = str, content_type = "text/plain") + ) +)] +pub async fn health_check() -> impl IntoResponse { + let name = env!("CARGO_PKG_NAME"); + let ver = env!("CARGO_PKG_VERSION"); + + format!("{name} v{ver} is live") +} + +pub fn build_router(state: AppHandle) -> Router { + let (router, _api) = OpenApiRouter::with_openapi(ApiDoc::openapi()) + .routes(utoipa_axum::routes!(health_check)) + .nest("/api", routes::router(state)) + .split_for_parts(); + + #[cfg(feature = "swagger")] + let router = router.merge( + utoipa_swagger_ui::SwaggerUi::new("/swagger-ui") + .url("/api-docs/swaggerdoc.json", _api.clone()), + ); + + #[cfg(feature = "redoc")] + let router = router.merge(utoipa_redoc::Redoc::with_url("/redoc", _api.clone())); + + #[cfg(feature = "rapidoc")] + let router = router.merge( + utoipa_rapidoc::RapiDoc::with_openapi("/api-docs/rapidoc.json", _api.clone()) + .path("/rapidoc"), + ); + + #[cfg(feature = "scalar")] + let router = router.merge(utoipa_scalar::Scalar::with_url("/scalar", _api)); + + warden_middleware::apply(router) +} diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs new file mode 100644 index 0000000..cc065e8 --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes.rs @@ -0,0 +1,11 @@ +mod routing; + +use utoipa_axum::{router::OpenApiRouter, routes}; + +use crate::state::AppHandle; + +pub fn router(store: AppHandle) -> OpenApiRouter { + OpenApiRouter::new() + .routes(routes!(routing::get_active)) + .with_state(store) +} diff --git a/crates/configuration/src/server/http_svc/routes/routing.rs b/crates/configuration/src/server/http_svc/routes/routing.rs new file mode 100644 index 0000000..7c8affe --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes/routing.rs @@ -0,0 +1,4 @@ +//pub mod delete_routing; +pub mod get_active; +//pub mod post_routing; +//pub mod replace_routing; diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs new file mode 100644 index 0000000..562a1f1 --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs @@ -0,0 +1,30 @@ +use axum::{extract::State, response::IntoResponse}; +use warden_core::configuration::routing::RoutingConfiguration; + +use crate::{ + server::{error::AppError, http_svc::TAG_ROUTING, version::Version}, + state::AppHandle, +}; + +#[utoipa::path( + get, + responses(( + status = OK, + body = RoutingConfiguration + )), + operation_id = "get_active_routing", // https://github.com/juhaku/utoipa/issues/1170 + path = "/{version}/routing", + params( + ("version" = Version, Path, description = "API version, e.g., v1, v2, v3") + ), + tag = TAG_ROUTING, + ) +] +#[axum::debug_handler] +#[tracing::instrument(skip(state), err(Debug), fields(method = "GET"))] +pub(super) async fn active_routing( + version: Version, + State(state): State<AppHandle>, +) -> Result<impl IntoResponse, AppError> { + Ok(String::default().into_response()) +} diff --git a/crates/configuration/src/server/interceptor.rs b/crates/configuration/src/server/interceptor.rs new file mode 100644 index 0000000..eeb36c2 --- /dev/null +++ b/crates/configuration/src/server/interceptor.rs @@ -0,0 +1,23 @@ +use tonic::{Status, service::Interceptor}; +use tracing::Span; +use warden_stack::{ + opentelemetry::global, tracing::telemetry::tonic::extractor, + tracing_opentelemetry::OpenTelemetrySpanExt, +}; + +#[derive(Clone, Copy)] +pub struct MyInterceptor; + +impl Interceptor for MyInterceptor { + fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + let span = Span::current(); + + let cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::MetadataMap(request.metadata())) + }); + + span.set_parent(cx); + + Ok(request) + } +} diff --git a/crates/configuration/src/server/version.rs b/crates/configuration/src/server/version.rs new file mode 100644 index 0000000..4eb5677 --- /dev/null +++ b/crates/configuration/src/server/version.rs @@ -0,0 +1,35 @@ +use std::collections::HashMap; + +use axum::{ + RequestPartsExt, + extract::{FromRequestParts, Path}, + http::{StatusCode, request::Parts}, + response::{IntoResponse, Response}, +}; +use utoipa::ToSchema; + +#[derive(Debug, ToSchema)] +pub enum Version { + V0, +} + +impl<S> FromRequestParts<S> for Version +where + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> { + let params: Path<HashMap<String, String>> = + parts.extract().await.map_err(IntoResponse::into_response)?; + + let version = params + .get("version") + .ok_or_else(|| (StatusCode::NOT_FOUND, "version param missing").into_response())?; + + match version.as_str() { + "v0" => Ok(Version::V0), + _ => Err((StatusCode::NOT_FOUND, "unknown version").into_response()), + } + } +} diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs new file mode 100644 index 0000000..7672891 --- /dev/null +++ b/crates/configuration/src/state.rs @@ -0,0 +1,51 @@ +mod cache_key; +mod routing; + +use async_nats::jetstream::Context; +use sqlx::PgPool; +use std::{ops::Deref, sync::Arc}; +use tonic::transport::Endpoint; +use tracing::error; +use warden_stack::{Configuration, cache::RedisManager}; + +use crate::{ + cnfg::LocalConfig, + server::grpc::interceptor::{Intercepted, MyInterceptor}, +}; + +#[derive(Clone)] +pub struct AppHandle(Arc<AppState>); + +impl Deref for AppHandle { + type Target = Arc<AppState>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Clone)] +pub struct Services { + pub postgres: PgPool, + pub cache: RedisManager, + pub jetstream: Context, +} + +pub struct AppState { + pub services: Services, + pub app_config: LocalConfig, +} + +impl AppState { + pub async fn create( + services: Services, + configuration: &Configuration, + ) -> Result<AppHandle, AppError> { + let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + + Ok(AppHandle(Arc::new(Self { + services, + app_config: local_config, + }))) + } +} diff --git a/crates/configuration/src/state/cache_key.rs b/crates/configuration/src/state/cache_key.rs new file mode 100644 index 0000000..a63b15d --- /dev/null +++ b/crates/configuration/src/state/cache_key.rs @@ -0,0 +1,21 @@ +use warden_stack::redis::ToRedisArgs; + +#[derive(Clone, Copy, Debug)] +pub enum CacheKey<'a> { + ActiveRouting, + Routing(&'a uuid::Uuid), +} + +impl ToRedisArgs for CacheKey<'_> { + fn write_redis_args<W>(&self, out: &mut W) + where + W: ?Sized + warden_stack::redis::RedisWrite, + { + let value = match self { + CacheKey::ActiveRouting => "routing.active".into(), + CacheKey::Routing(uuid) => format!("routing.{uuid}"), + }; + + out.write_arg(value.as_bytes()); + } +} diff --git a/crates/configuration/src/state/routing.rs b/crates/configuration/src/state/routing.rs new file mode 100644 index 0000000..ea51c17 --- /dev/null +++ b/crates/configuration/src/state/routing.rs @@ -0,0 +1 @@ +mod query; diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query.rs new file mode 100644 index 0000000..3c6814d --- /dev/null +++ b/crates/configuration/src/state/routing/query.rs @@ -0,0 +1,106 @@ +use opentelemetry_semantic_conventions::attribute; +use prost::Message; +use tonic::{Request, Status}; +use tracing::{Instrument, debug, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_stack::redis::AsyncCommands; + +use uuid::Uuid; +use warden_core::{ + configuration::routing::{ + GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting, + }, + google, +}; + +use crate::state::{AppHandle, cache_key::CacheKey}; + +pub struct RoutingRow { + id: Uuid, + configuration: sqlx::types::Json<RoutingConfiguration>, +} + +#[tonic::async_trait] +impl QueryRouting for AppHandle { + #[instrument(skip(self, _request), Err(Debug))] + async fn get_active_routing_configuration( + &self, + _request: Request<google::protobuf::Empty>, + ) -> Result<tonic::Response<GetActiveRoutingResponse>, Status> { + let mut cache = self + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let span = info_span!("cache.get"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "get"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + let routing_config = cache + .get::<_, Vec<u8>>(CacheKey::ActiveRouting) + .instrument(span) + .await + .map(|value| { + if !value.is_empty() { + RoutingConfiguration::decode(value.as_ref()).ok() + } else { + None + } + }); + + if let Ok(Some(routing_config)) = routing_config { + if routing_config.active { + return Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(routing_config), + })); + } + } + + let span = info_span!("db.get.routing.active"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "select"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + + let config = sqlx::query_as!( + RoutingRow, + r#"select id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>" from routing where + configuration->>'active' = 'true'"#, + ) + .fetch_optional(&self.services.postgres) + .instrument(span) + .await.map_err(|e| tonic::Status::internal(e.to_string()))?; + + let config = config.map(|transaction| { + debug!(id = ?transaction.id, "found active config"); + transaction.configuration.0 + }); + + match config { + Some(config) => { + let bytes = config.encode_to_vec(); + let span = info_span!("cache.set"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "set"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active"); + + if let Err(e) = cache + .set::<_, _, ()>(CacheKey::ActiveRouting, bytes) + .instrument(span) + .await + { + warn!("{e}"); + }; + + Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(config), + })) + } + None => Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: None, + })), + } + } +} |