diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-12 21:05:07 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-12 21:05:07 +0200 |
commit | b6924a50c9ec49e1b2b0d286abbbe608410af87d (patch) | |
tree | 9c5cf583fedfbb585985ac829bbdfdadce1571fe /crates/router/src/processor | |
parent | d75b5fc9c0497f56e6b8602d8ff8991bfaeff18c (diff) | |
download | warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.tar.bz2 warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.zip |
feat(router): get config
Diffstat (limited to 'crates/router/src/processor')
-rw-r--r-- | crates/router/src/processor/grpc.rs | 27 | ||||
-rw-r--r-- | crates/router/src/processor/load.rs | 51 | ||||
-rw-r--r-- | crates/router/src/processor/publish.rs | 51 | ||||
-rw-r--r-- | crates/router/src/processor/reload.rs | 58 | ||||
-rw-r--r-- | crates/router/src/processor/route.rs | 91 |
5 files changed, 278 insertions, 0 deletions
diff --git a/crates/router/src/processor/grpc.rs b/crates/router/src/processor/grpc.rs new file mode 100644 index 0000000..344f2a1 --- /dev/null +++ b/crates/router/src/processor/grpc.rs @@ -0,0 +1,27 @@ +pub mod interceptor { + use opentelemetry::global; + use tonic::{ + Status, + service::{Interceptor, interceptor::InterceptedService}, + transport::Channel, + }; + use tracing::Span; + use tracing_opentelemetry::OpenTelemetrySpanExt; + use warden_stack::tracing::telemetry::tonic::injector; + + pub type Intercepted = InterceptedService<Channel, MyInterceptor>; + + #[derive(Clone, Copy)] + pub struct MyInterceptor; + + impl Interceptor for MyInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + let cx = Span::current().context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut())) + }); + + Ok(request) + } + } +} diff --git a/crates/router/src/processor/load.rs b/crates/router/src/processor/load.rs new file mode 100644 index 0000000..9d3fd0d --- /dev/null +++ b/crates/router/src/processor/load.rs @@ -0,0 +1,51 @@ +use opentelemetry_semantic_conventions::attribute; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{Instrument, debug, info, info_span, instrument, warn}; +use warden_core::{configuration::routing::RoutingConfiguration, google }; + +use crate::{cnfg::CACHE_KEY, state::AppHandle}; + +#[instrument(skip(state))] +pub async fn get_routing_config(state: AppHandle) -> Option<RoutingConfiguration> { + debug!("getting routing config"); + { + let span = info_span!("local_cache.get"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "moka"); + let local_cache = state.local_cache.read().await; + if let Some(value) = local_cache.get(&CACHE_KEY).await { + return Some(value); + } + } + + let mut client = state.query_routing_client.clone(); + + let span = info_span!("get.routing.config"); + span.set_attribute(attribute::RPC_SERVICE, env!("CARGO_PKG_NAME")); + span.set_attribute("otel.kind", "client"); + + if let Ok(config) = client + .get_active_routing_configuration(google::protobuf::Empty::default()) + .instrument(span) + .await + { + debug!("fetched routing config"); + let span = info_span!("local_cache.insert"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "moka"); + if let Some(config) = config.into_inner().configuration { + debug!("updating cache"); + let local_cache = state.local_cache.write().await; + local_cache + .insert(CACHE_KEY, config.clone()) + .instrument(span) + .await; + info!("cache refreshed"); + return Some(config); + } else { + warn!("no routing config is active"); + return None; + } + } else { + warn!("no routing config is active"); + return None; + } +} diff --git a/crates/router/src/processor/publish.rs b/crates/router/src/processor/publish.rs new file mode 100644 index 0000000..16dcec8 --- /dev/null +++ b/crates/router/src/processor/publish.rs @@ -0,0 +1,51 @@ +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{Instrument, Span, debug, info, info_span, warn}; +use warden_core::{configuration::routing::RoutingConfiguration, message::Payload}; +use warden_stack::tracing::telemetry::nats::injector; + +use crate::state::AppHandle; + +pub(crate) async fn to_rule( + (rule_id, rule_version): (&String, &str), + state: AppHandle, + mut payload: Payload, + routing: RoutingConfiguration, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!( + "{}.{rule_id}.v{rule_version}", + state.config.nats.destination_prefix + ); + debug!(subject = ?subject, "publishing"); + + payload.routing = Some(routing); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + info!("published to rule"); + + Ok(()) +} diff --git a/crates/router/src/processor/reload.rs b/crates/router/src/processor/reload.rs new file mode 100644 index 0000000..c75465c --- /dev/null +++ b/crates/router/src/processor/reload.rs @@ -0,0 +1,58 @@ +use futures_util::StreamExt; +use async_nats::jetstream::consumer; +use tracing::{trace, debug, error, info}; +use uuid::Uuid; +use warden_core::configuration::ReloadEvent; + +use crate::state::AppHandle; + +pub async fn reload(state: AppHandle) -> anyhow::Result<()> { + let id = Uuid::now_v7().to_string(); + info!(durable = id, "listening for configuration changes"); + + let durable = &id; + let consumer = state + .services + .jetstream + .get_stream(state.config.nats.config.stream.to_string()) + .await? + .get_or_create_consumer( + durable, + consumer::pull::Config { + durable_name: Some(durable.to_string()), + filter_subject: state.config.nats.config.reload_subject.to_string(), + deliver_policy: consumer::DeliverPolicy::LastPerSubject, + ..Default::default() + }, + ) + .await?; + + let mut messages = consumer.messages().await?; + while let Some(value) = messages.next().await { + match value { + Ok(message) => { + trace!("got reload cache event",); + if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec()) + .map(|value| ReloadEvent::from_str_name(&value)) + { + match event { + ReloadEvent::Routing => { + let local_cache = state.local_cache.write().await; + local_cache.invalidate_all(); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + _ => { + debug!(event = ?event, "detected reload event, acknowledging"); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + } + } + } + Err(e) => { + error!("{e}") + } + } + } + + Ok(()) +} diff --git a/crates/router/src/processor/route.rs b/crates/router/src/processor/route.rs new file mode 100644 index 0000000..404c2ca --- /dev/null +++ b/crates/router/src/processor/route.rs @@ -0,0 +1,91 @@ +use anyhow::Result; +use std::{collections::HashSet, sync::Arc}; + +use opentelemetry::global; +use prost::Message; +use tracing::{info_span, instrument, trace, trace_span, warn, Instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{google, message::Payload}; +use warden_stack::tracing::telemetry::nats; + +use crate::{cnfg::CACHE_KEY, processor::publish, state::AppHandle}; + +#[instrument(skip(message, state), err(Debug), fields(msg_id))] +pub async fn route(message: async_nats::jetstream::Message, state: AppHandle) -> Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&nats::extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let payload: Payload = Message::decode(message.payload.as_ref())?; + + match payload.transaction { + Some(ref transaction) => { + let msg_id = match transaction { + warden_core::message::payload::Transaction::Pacs008(pacs008_document) => { + &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id + } + warden_core::message::payload::Transaction::Pacs002(pacs002_document) => { + &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id + } + }; + span.record("msg_id", msg_id); + + let routing = { + let local_cache = state.local_cache.read().await; + local_cache.get(&CACHE_KEY).await + }; + + let routing = match routing { + Some(local) => Some(local), + None => { + let span = trace_span!( + "get.active.routing", + "otel.kind" = "client", + "rpc.service" = "configuration" + ); + let mut client = state.query_routing_client.clone(); + client + .get_active_routing_configuration(google::protobuf::Empty::default()) + .instrument(span) + .await? + .into_inner() + .configuration + } + } + .ok_or_else(|| anyhow::anyhow!("no routing configuration available"))?; + + trace!(tx_tp = ?payload.tx_tp, "finding all rules from configuration"); + let set: HashSet<_> = routing + .messages + .iter() + .filter(|msg| msg.tx_tp == payload.tx_tp) + .flat_map(|msg| &msg.typologies) + .flat_map(|typ| &typ.rules) + .map(|rule| (&rule.id, rule.version())) + .collect(); + + let futs = set.into_iter().map(|value| { + publish::to_rule(value, Arc::clone(&state), payload.clone(), routing.clone()) + }); + + futures_util::future::join_all(futs).await; + } + None => { + warn!("transaction is empty - proceeding with ack"); + } + } + + let span = info_span!("nats.ack"); + message + .ack() + .instrument(span) + .await + .map_err(|_| anyhow::anyhow!("ack error"))?; + + Ok(()) +} |