aboutsummaryrefslogtreecommitdiffstats
path: root/crates/router/src/processor/route.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/router/src/processor/route.rs')
-rw-r--r--crates/router/src/processor/route.rs91
1 files changed, 91 insertions, 0 deletions
diff --git a/crates/router/src/processor/route.rs b/crates/router/src/processor/route.rs
new file mode 100644
index 0000000..404c2ca
--- /dev/null
+++ b/crates/router/src/processor/route.rs
@@ -0,0 +1,91 @@
+use anyhow::Result;
+use std::{collections::HashSet, sync::Arc};
+
+use opentelemetry::global;
+use prost::Message;
+use tracing::{info_span, instrument, trace, trace_span, warn, Instrument, Span};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::{google, message::Payload};
+use warden_stack::tracing::telemetry::nats;
+
+use crate::{cnfg::CACHE_KEY, processor::publish, state::AppHandle};
+
+#[instrument(skip(message, state), err(Debug), fields(msg_id))]
+pub async fn route(message: async_nats::jetstream::Message, state: AppHandle) -> Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&nats::extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let payload: Payload = Message::decode(message.payload.as_ref())?;
+
+ match payload.transaction {
+ Some(ref transaction) => {
+ let msg_id = match transaction {
+ warden_core::message::payload::Transaction::Pacs008(pacs008_document) => {
+ &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id
+ }
+ warden_core::message::payload::Transaction::Pacs002(pacs002_document) => {
+ &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id
+ }
+ };
+ span.record("msg_id", msg_id);
+
+ let routing = {
+ let local_cache = state.local_cache.read().await;
+ local_cache.get(&CACHE_KEY).await
+ };
+
+ let routing = match routing {
+ Some(local) => Some(local),
+ None => {
+ let span = trace_span!(
+ "get.active.routing",
+ "otel.kind" = "client",
+ "rpc.service" = "configuration"
+ );
+ let mut client = state.query_routing_client.clone();
+ client
+ .get_active_routing_configuration(google::protobuf::Empty::default())
+ .instrument(span)
+ .await?
+ .into_inner()
+ .configuration
+ }
+ }
+ .ok_or_else(|| anyhow::anyhow!("no routing configuration available"))?;
+
+ trace!(tx_tp = ?payload.tx_tp, "finding all rules from configuration");
+ let set: HashSet<_> = routing
+ .messages
+ .iter()
+ .filter(|msg| msg.tx_tp == payload.tx_tp)
+ .flat_map(|msg| &msg.typologies)
+ .flat_map(|typ| &typ.rules)
+ .map(|rule| (&rule.id, rule.version()))
+ .collect();
+
+ let futs = set.into_iter().map(|value| {
+ publish::to_rule(value, Arc::clone(&state), payload.clone(), routing.clone())
+ });
+
+ futures_util::future::join_all(futs).await;
+ }
+ None => {
+ warn!("transaction is empty - proceeding with ack");
+ }
+ }
+
+ let span = info_span!("nats.ack");
+ message
+ .ack()
+ .instrument(span)
+ .await
+ .map_err(|_| anyhow::anyhow!("ack error"))?;
+
+ Ok(())
+}