diff options
Diffstat (limited to 'crates/router/src')
-rw-r--r-- | crates/router/src/cnfg.rs | 32 | ||||
-rw-r--r-- | crates/router/src/main.rs | 62 | ||||
-rw-r--r-- | crates/router/src/processor.rs | 108 | ||||
-rw-r--r-- | crates/router/src/processor/grpc.rs | 27 | ||||
-rw-r--r-- | crates/router/src/processor/load.rs | 51 | ||||
-rw-r--r-- | crates/router/src/processor/publish.rs | 51 | ||||
-rw-r--r-- | crates/router/src/processor/reload.rs | 58 | ||||
-rw-r--r-- | crates/router/src/processor/route.rs | 91 | ||||
-rw-r--r-- | crates/router/src/state.rs | 55 |
9 files changed, 535 insertions, 0 deletions
diff --git a/crates/router/src/cnfg.rs b/crates/router/src/cnfg.rs new file mode 100644 index 0000000..3f6e563 --- /dev/null +++ b/crates/router/src/cnfg.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use serde::Deserialize; + +pub const CACHE_KEY: i32 = 0; + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct LocalConfig { + pub config_endpoint: Arc<str>, + pub nats: Nats, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct Nats { + #[serde(rename = "stream-name")] + pub name: Arc<str>, + pub subjects: Arc<[String]>, + pub destination_prefix: Arc<str>, + pub max_messages: i64, + pub durable_name: Arc<str>, + pub config: ConfigNats, +} + +#[derive(Deserialize, Clone)] +pub struct ConfigNats { + #[serde(rename = "stream")] + pub stream: Arc<str>, + #[serde(rename = "reload-subject")] + pub reload_subject: Arc<str>, +} diff --git a/crates/router/src/main.rs b/crates/router/src/main.rs new file mode 100644 index 0000000..cc4c927 --- /dev/null +++ b/crates/router/src/main.rs @@ -0,0 +1,62 @@ +mod cnfg; +mod processor; +mod state; + +use anyhow::Result; +use clap::Parser; +use tracing::error; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +/// warden-router +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let config = include_str!("../router.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let mut services = Services::builder() + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .build(); + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let services = state::Services { + jetstream, + }; + + processor::serve(services, config, provider) + .await + .inspect_err(|e| error!("{e}")) +} diff --git a/crates/router/src/processor.rs b/crates/router/src/processor.rs new file mode 100644 index 0000000..b8c69f3 --- /dev/null +++ b/crates/router/src/processor.rs @@ -0,0 +1,108 @@ +pub mod grpc; +mod reload; +mod route; +mod publish; +mod load; + +use std::sync::Arc; + +use anyhow::Result; +use async_nats::jetstream::{consumer::{pull, Consumer}, Context}; +use tokio::signal; +use tracing::{error, trace}; +use warden_stack::{Configuration, tracing::SdkTracerProvider}; +use futures_util::StreamExt; + +use crate::{cnfg::Nats, state::{AppHandle, AppState, Services}}; + + +pub async fn serve( + services: Services, + config: Configuration, + provider: SdkTracerProvider, +) -> anyhow::Result<()> { + let state = Arc::new(AppState::new(services, config).await?); + + tokio::select! { + _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {} + _ = shutdown_signal(provider) => {} + }; + + Ok(()) +} + + +async fn run(state: AppHandle) -> anyhow::Result<()> { + let config = Arc::clone(&state); + let (consumer, _) = tokio::join!( + get_or_create_stream(&state.services.jetstream, &state.config.nats), + load::get_routing_config(Arc::clone(&config)) + ); + + let consumer = consumer?; + + // Consume messages from the consumer + while let Some(Ok(message)) = consumer.messages().await?.next().await { + if let Err(e) = route::route(message, Arc::clone(&state)).await { + error!("{}", e.to_string()); + } + } + + Ok(()) +} + +async fn get_or_create_stream( + jetstream: &Context, + nats: &Nats, +) -> anyhow::Result<Consumer<pull::Config>> { + trace!(name = ?nats.name, "getting or creating stream"); + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: nats.name.to_string(), + subjects: nats.subjects.iter().map(Into::into).collect(), + max_messages: nats.max_messages, + ..Default::default() + }) + .await?; + let durable = nats.durable_name.to_string(); + // Get or create a pull-based consumer + Ok(stream + .get_or_create_consumer( + durable.as_ref(), + async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable.to_string()), + ..Default::default() + }, + ) + .await?) +} + + +async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } + let _ = provider.shutdown(); + + Ok(()) +} diff --git a/crates/router/src/processor/grpc.rs b/crates/router/src/processor/grpc.rs new file mode 100644 index 0000000..344f2a1 --- /dev/null +++ b/crates/router/src/processor/grpc.rs @@ -0,0 +1,27 @@ +pub mod interceptor { + use opentelemetry::global; + use tonic::{ + Status, + service::{Interceptor, interceptor::InterceptedService}, + transport::Channel, + }; + use tracing::Span; + use tracing_opentelemetry::OpenTelemetrySpanExt; + use warden_stack::tracing::telemetry::tonic::injector; + + pub type Intercepted = InterceptedService<Channel, MyInterceptor>; + + #[derive(Clone, Copy)] + pub struct MyInterceptor; + + impl Interceptor for MyInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + let cx = Span::current().context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut())) + }); + + Ok(request) + } + } +} diff --git a/crates/router/src/processor/load.rs b/crates/router/src/processor/load.rs new file mode 100644 index 0000000..9d3fd0d --- /dev/null +++ b/crates/router/src/processor/load.rs @@ -0,0 +1,51 @@ +use opentelemetry_semantic_conventions::attribute; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{Instrument, debug, info, info_span, instrument, warn}; +use warden_core::{configuration::routing::RoutingConfiguration, google }; + +use crate::{cnfg::CACHE_KEY, state::AppHandle}; + +#[instrument(skip(state))] +pub async fn get_routing_config(state: AppHandle) -> Option<RoutingConfiguration> { + debug!("getting routing config"); + { + let span = info_span!("local_cache.get"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "moka"); + let local_cache = state.local_cache.read().await; + if let Some(value) = local_cache.get(&CACHE_KEY).await { + return Some(value); + } + } + + let mut client = state.query_routing_client.clone(); + + let span = info_span!("get.routing.config"); + span.set_attribute(attribute::RPC_SERVICE, env!("CARGO_PKG_NAME")); + span.set_attribute("otel.kind", "client"); + + if let Ok(config) = client + .get_active_routing_configuration(google::protobuf::Empty::default()) + .instrument(span) + .await + { + debug!("fetched routing config"); + let span = info_span!("local_cache.insert"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "moka"); + if let Some(config) = config.into_inner().configuration { + debug!("updating cache"); + let local_cache = state.local_cache.write().await; + local_cache + .insert(CACHE_KEY, config.clone()) + .instrument(span) + .await; + info!("cache refreshed"); + return Some(config); + } else { + warn!("no routing config is active"); + return None; + } + } else { + warn!("no routing config is active"); + return None; + } +} diff --git a/crates/router/src/processor/publish.rs b/crates/router/src/processor/publish.rs new file mode 100644 index 0000000..16dcec8 --- /dev/null +++ b/crates/router/src/processor/publish.rs @@ -0,0 +1,51 @@ +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{Instrument, Span, debug, info, info_span, warn}; +use warden_core::{configuration::routing::RoutingConfiguration, message::Payload}; +use warden_stack::tracing::telemetry::nats::injector; + +use crate::state::AppHandle; + +pub(crate) async fn to_rule( + (rule_id, rule_version): (&String, &str), + state: AppHandle, + mut payload: Payload, + routing: RoutingConfiguration, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!( + "{}.{rule_id}.v{rule_version}", + state.config.nats.destination_prefix + ); + debug!(subject = ?subject, "publishing"); + + payload.routing = Some(routing); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + info!("published to rule"); + + Ok(()) +} diff --git a/crates/router/src/processor/reload.rs b/crates/router/src/processor/reload.rs new file mode 100644 index 0000000..c75465c --- /dev/null +++ b/crates/router/src/processor/reload.rs @@ -0,0 +1,58 @@ +use futures_util::StreamExt; +use async_nats::jetstream::consumer; +use tracing::{trace, debug, error, info}; +use uuid::Uuid; +use warden_core::configuration::ReloadEvent; + +use crate::state::AppHandle; + +pub async fn reload(state: AppHandle) -> anyhow::Result<()> { + let id = Uuid::now_v7().to_string(); + info!(durable = id, "listening for configuration changes"); + + let durable = &id; + let consumer = state + .services + .jetstream + .get_stream(state.config.nats.config.stream.to_string()) + .await? + .get_or_create_consumer( + durable, + consumer::pull::Config { + durable_name: Some(durable.to_string()), + filter_subject: state.config.nats.config.reload_subject.to_string(), + deliver_policy: consumer::DeliverPolicy::LastPerSubject, + ..Default::default() + }, + ) + .await?; + + let mut messages = consumer.messages().await?; + while let Some(value) = messages.next().await { + match value { + Ok(message) => { + trace!("got reload cache event",); + if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec()) + .map(|value| ReloadEvent::from_str_name(&value)) + { + match event { + ReloadEvent::Routing => { + let local_cache = state.local_cache.write().await; + local_cache.invalidate_all(); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + _ => { + debug!(event = ?event, "detected reload event, acknowledging"); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + } + } + } + Err(e) => { + error!("{e}") + } + } + } + + Ok(()) +} diff --git a/crates/router/src/processor/route.rs b/crates/router/src/processor/route.rs new file mode 100644 index 0000000..404c2ca --- /dev/null +++ b/crates/router/src/processor/route.rs @@ -0,0 +1,91 @@ +use anyhow::Result; +use std::{collections::HashSet, sync::Arc}; + +use opentelemetry::global; +use prost::Message; +use tracing::{info_span, instrument, trace, trace_span, warn, Instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{google, message::Payload}; +use warden_stack::tracing::telemetry::nats; + +use crate::{cnfg::CACHE_KEY, processor::publish, state::AppHandle}; + +#[instrument(skip(message, state), err(Debug), fields(msg_id))] +pub async fn route(message: async_nats::jetstream::Message, state: AppHandle) -> Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&nats::extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let payload: Payload = Message::decode(message.payload.as_ref())?; + + match payload.transaction { + Some(ref transaction) => { + let msg_id = match transaction { + warden_core::message::payload::Transaction::Pacs008(pacs008_document) => { + &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id + } + warden_core::message::payload::Transaction::Pacs002(pacs002_document) => { + &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id + } + }; + span.record("msg_id", msg_id); + + let routing = { + let local_cache = state.local_cache.read().await; + local_cache.get(&CACHE_KEY).await + }; + + let routing = match routing { + Some(local) => Some(local), + None => { + let span = trace_span!( + "get.active.routing", + "otel.kind" = "client", + "rpc.service" = "configuration" + ); + let mut client = state.query_routing_client.clone(); + client + .get_active_routing_configuration(google::protobuf::Empty::default()) + .instrument(span) + .await? + .into_inner() + .configuration + } + } + .ok_or_else(|| anyhow::anyhow!("no routing configuration available"))?; + + trace!(tx_tp = ?payload.tx_tp, "finding all rules from configuration"); + let set: HashSet<_> = routing + .messages + .iter() + .filter(|msg| msg.tx_tp == payload.tx_tp) + .flat_map(|msg| &msg.typologies) + .flat_map(|typ| &typ.rules) + .map(|rule| (&rule.id, rule.version())) + .collect(); + + let futs = set.into_iter().map(|value| { + publish::to_rule(value, Arc::clone(&state), payload.clone(), routing.clone()) + }); + + futures_util::future::join_all(futs).await; + } + None => { + warn!("transaction is empty - proceeding with ack"); + } + } + + let span = info_span!("nats.ack"); + message + .ack() + .instrument(span) + .await + .map_err(|_| anyhow::anyhow!("ack error"))?; + + Ok(()) +} diff --git a/crates/router/src/state.rs b/crates/router/src/state.rs new file mode 100644 index 0000000..e01629e --- /dev/null +++ b/crates/router/src/state.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; + +use async_nats::jetstream::Context; +use moka::future::Cache; +use tokio::sync::RwLock; +use tonic::transport::Endpoint; +use tracing::error; +use warden_core::configuration::routing::{ + RoutingConfiguration, query_routing_client::QueryRoutingClient, +}; +use warden_stack::{Configuration}; + +use crate::{ + cnfg::LocalConfig, + processor::grpc::interceptor::{Intercepted, MyInterceptor}, +}; + +#[derive(Clone)] +pub struct Services { + pub jetstream: Context, +} + +pub type AppHandle = Arc<AppState>; + +#[derive(Clone)] +pub struct AppState { + pub services: Services, + pub local_cache: Arc<RwLock<Cache<i32, RoutingConfiguration>>>, + pub config: LocalConfig, + pub query_routing_client: QueryRoutingClient<Intercepted>, +} + +impl AppState { + pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> { + let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + let channel = Endpoint::new(config.config_endpoint.to_string())? + .connect() + .await + .inspect_err(|e| { + error!( + endpoint = ?config.config_endpoint, + "could not connect to config service: {e}", + ) + })?; + + let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor); + + Ok(Self { + services, + config, + local_cache: Arc::new(RwLock::new(Cache::builder().build())), + query_routing_client, + }) + } +} |