From b6924a50c9ec49e1b2b0d286abbbe608410af87d Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Tue, 12 Aug 2025 21:05:07 +0200 Subject: feat(router): get config --- crates/router/src/processor/route.rs | 91 ++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 crates/router/src/processor/route.rs (limited to 'crates/router/src/processor/route.rs') diff --git a/crates/router/src/processor/route.rs b/crates/router/src/processor/route.rs new file mode 100644 index 0000000..404c2ca --- /dev/null +++ b/crates/router/src/processor/route.rs @@ -0,0 +1,91 @@ +use anyhow::Result; +use std::{collections::HashSet, sync::Arc}; + +use opentelemetry::global; +use prost::Message; +use tracing::{info_span, instrument, trace, trace_span, warn, Instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{google, message::Payload}; +use warden_stack::tracing::telemetry::nats; + +use crate::{cnfg::CACHE_KEY, processor::publish, state::AppHandle}; + +#[instrument(skip(message, state), err(Debug), fields(msg_id))] +pub async fn route(message: async_nats::jetstream::Message, state: AppHandle) -> Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&nats::extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let payload: Payload = Message::decode(message.payload.as_ref())?; + + match payload.transaction { + Some(ref transaction) => { + let msg_id = match transaction { + warden_core::message::payload::Transaction::Pacs008(pacs008_document) => { + &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id + } + warden_core::message::payload::Transaction::Pacs002(pacs002_document) => { + &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id + } + }; + span.record("msg_id", msg_id); + + let routing = { + let local_cache = state.local_cache.read().await; + local_cache.get(&CACHE_KEY).await + }; + + let routing = match routing { + Some(local) => Some(local), + None => { + let span = trace_span!( + "get.active.routing", + "otel.kind" = "client", + "rpc.service" = "configuration" + ); + let mut client = state.query_routing_client.clone(); + client + .get_active_routing_configuration(google::protobuf::Empty::default()) + .instrument(span) + .await? + .into_inner() + .configuration + } + } + .ok_or_else(|| anyhow::anyhow!("no routing configuration available"))?; + + trace!(tx_tp = ?payload.tx_tp, "finding all rules from configuration"); + let set: HashSet<_> = routing + .messages + .iter() + .filter(|msg| msg.tx_tp == payload.tx_tp) + .flat_map(|msg| &msg.typologies) + .flat_map(|typ| &typ.rules) + .map(|rule| (&rule.id, rule.version())) + .collect(); + + let futs = set.into_iter().map(|value| { + publish::to_rule(value, Arc::clone(&state), payload.clone(), routing.clone()) + }); + + futures_util::future::join_all(futs).await; + } + None => { + warn!("transaction is empty - proceeding with ack"); + } + } + + let span = info_span!("nats.ack"); + message + .ack() + .instrument(span) + .await + .map_err(|_| anyhow::anyhow!("ack error"))?; + + Ok(()) +} -- cgit v1.2.3