aboutsummaryrefslogtreecommitdiffstats
path: root/crates/router/src/processor
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-12 21:05:07 +0200
committerrtkay123 <dev@kanjala.com>2025-08-12 21:05:07 +0200
commitb6924a50c9ec49e1b2b0d286abbbe608410af87d (patch)
tree9c5cf583fedfbb585985ac829bbdfdadce1571fe /crates/router/src/processor
parentd75b5fc9c0497f56e6b8602d8ff8991bfaeff18c (diff)
downloadwarden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.tar.bz2
warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.zip
feat(router): get config
Diffstat (limited to 'crates/router/src/processor')
-rw-r--r--crates/router/src/processor/grpc.rs27
-rw-r--r--crates/router/src/processor/load.rs51
-rw-r--r--crates/router/src/processor/publish.rs51
-rw-r--r--crates/router/src/processor/reload.rs58
-rw-r--r--crates/router/src/processor/route.rs91
5 files changed, 278 insertions, 0 deletions
diff --git a/crates/router/src/processor/grpc.rs b/crates/router/src/processor/grpc.rs
new file mode 100644
index 0000000..344f2a1
--- /dev/null
+++ b/crates/router/src/processor/grpc.rs
@@ -0,0 +1,27 @@
+pub mod interceptor {
+ use opentelemetry::global;
+ use tonic::{
+ Status,
+ service::{Interceptor, interceptor::InterceptedService},
+ transport::Channel,
+ };
+ use tracing::Span;
+ use tracing_opentelemetry::OpenTelemetrySpanExt;
+ use warden_stack::tracing::telemetry::tonic::injector;
+
+ pub type Intercepted = InterceptedService<Channel, MyInterceptor>;
+
+ #[derive(Clone, Copy)]
+ pub struct MyInterceptor;
+
+ impl Interceptor for MyInterceptor {
+ fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
+ let cx = Span::current().context();
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut()))
+ });
+
+ Ok(request)
+ }
+ }
+}
diff --git a/crates/router/src/processor/load.rs b/crates/router/src/processor/load.rs
new file mode 100644
index 0000000..9d3fd0d
--- /dev/null
+++ b/crates/router/src/processor/load.rs
@@ -0,0 +1,51 @@
+use opentelemetry_semantic_conventions::attribute;
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use tracing::{Instrument, debug, info, info_span, instrument, warn};
+use warden_core::{configuration::routing::RoutingConfiguration, google };
+
+use crate::{cnfg::CACHE_KEY, state::AppHandle};
+
+#[instrument(skip(state))]
+pub async fn get_routing_config(state: AppHandle) -> Option<RoutingConfiguration> {
+ debug!("getting routing config");
+ {
+ let span = info_span!("local_cache.get");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "moka");
+ let local_cache = state.local_cache.read().await;
+ if let Some(value) = local_cache.get(&CACHE_KEY).await {
+ return Some(value);
+ }
+ }
+
+ let mut client = state.query_routing_client.clone();
+
+ let span = info_span!("get.routing.config");
+ span.set_attribute(attribute::RPC_SERVICE, env!("CARGO_PKG_NAME"));
+ span.set_attribute("otel.kind", "client");
+
+ if let Ok(config) = client
+ .get_active_routing_configuration(google::protobuf::Empty::default())
+ .instrument(span)
+ .await
+ {
+ debug!("fetched routing config");
+ let span = info_span!("local_cache.insert");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "moka");
+ if let Some(config) = config.into_inner().configuration {
+ debug!("updating cache");
+ let local_cache = state.local_cache.write().await;
+ local_cache
+ .insert(CACHE_KEY, config.clone())
+ .instrument(span)
+ .await;
+ info!("cache refreshed");
+ return Some(config);
+ } else {
+ warn!("no routing config is active");
+ return None;
+ }
+ } else {
+ warn!("no routing config is active");
+ return None;
+ }
+}
diff --git a/crates/router/src/processor/publish.rs b/crates/router/src/processor/publish.rs
new file mode 100644
index 0000000..16dcec8
--- /dev/null
+++ b/crates/router/src/processor/publish.rs
@@ -0,0 +1,51 @@
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use tracing::{Instrument, Span, debug, info, info_span, warn};
+use warden_core::{configuration::routing::RoutingConfiguration, message::Payload};
+use warden_stack::tracing::telemetry::nats::injector;
+
+use crate::state::AppHandle;
+
+pub(crate) async fn to_rule(
+ (rule_id, rule_version): (&String, &str),
+ state: AppHandle,
+ mut payload: Payload,
+ routing: RoutingConfiguration,
+) -> anyhow::Result<()> {
+ // send transaction to next with nats
+ let subject = format!(
+ "{}.{rule_id}.v{rule_version}",
+ state.config.nats.destination_prefix
+ );
+ debug!(subject = ?subject, "publishing");
+
+ payload.routing = Some(routing);
+
+ let payload = prost::Message::encode_to_vec(&payload);
+
+ let mut headers = async_nats::HeaderMap::new();
+
+ let cx = Span::current().context();
+
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers))
+ });
+
+ let span = info_span!("nats.publish");
+ span.set_attribute(
+ attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
+ subject.to_string(),
+ );
+ state
+ .services
+ .jetstream
+ .publish_with_headers(subject.clone(), headers, payload.into())
+ .instrument(span)
+ .await
+ .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?;
+
+ info!("published to rule");
+
+ Ok(())
+}
diff --git a/crates/router/src/processor/reload.rs b/crates/router/src/processor/reload.rs
new file mode 100644
index 0000000..c75465c
--- /dev/null
+++ b/crates/router/src/processor/reload.rs
@@ -0,0 +1,58 @@
+use futures_util::StreamExt;
+use async_nats::jetstream::consumer;
+use tracing::{trace, debug, error, info};
+use uuid::Uuid;
+use warden_core::configuration::ReloadEvent;
+
+use crate::state::AppHandle;
+
+pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
+ let id = Uuid::now_v7().to_string();
+ info!(durable = id, "listening for configuration changes");
+
+ let durable = &id;
+ let consumer = state
+ .services
+ .jetstream
+ .get_stream(state.config.nats.config.stream.to_string())
+ .await?
+ .get_or_create_consumer(
+ durable,
+ consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ filter_subject: state.config.nats.config.reload_subject.to_string(),
+ deliver_policy: consumer::DeliverPolicy::LastPerSubject,
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ let mut messages = consumer.messages().await?;
+ while let Some(value) = messages.next().await {
+ match value {
+ Ok(message) => {
+ trace!("got reload cache event",);
+ if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec())
+ .map(|value| ReloadEvent::from_str_name(&value))
+ {
+ match event {
+ ReloadEvent::Routing => {
+ let local_cache = state.local_cache.write().await;
+ local_cache.invalidate_all();
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ _ => {
+ debug!(event = ?event, "detected reload event, acknowledging");
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("{e}")
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/crates/router/src/processor/route.rs b/crates/router/src/processor/route.rs
new file mode 100644
index 0000000..404c2ca
--- /dev/null
+++ b/crates/router/src/processor/route.rs
@@ -0,0 +1,91 @@
+use anyhow::Result;
+use std::{collections::HashSet, sync::Arc};
+
+use opentelemetry::global;
+use prost::Message;
+use tracing::{info_span, instrument, trace, trace_span, warn, Instrument, Span};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::{google, message::Payload};
+use warden_stack::tracing::telemetry::nats;
+
+use crate::{cnfg::CACHE_KEY, processor::publish, state::AppHandle};
+
+#[instrument(skip(message, state), err(Debug), fields(msg_id))]
+pub async fn route(message: async_nats::jetstream::Message, state: AppHandle) -> Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&nats::extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let payload: Payload = Message::decode(message.payload.as_ref())?;
+
+ match payload.transaction {
+ Some(ref transaction) => {
+ let msg_id = match transaction {
+ warden_core::message::payload::Transaction::Pacs008(pacs008_document) => {
+ &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id
+ }
+ warden_core::message::payload::Transaction::Pacs002(pacs002_document) => {
+ &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id
+ }
+ };
+ span.record("msg_id", msg_id);
+
+ let routing = {
+ let local_cache = state.local_cache.read().await;
+ local_cache.get(&CACHE_KEY).await
+ };
+
+ let routing = match routing {
+ Some(local) => Some(local),
+ None => {
+ let span = trace_span!(
+ "get.active.routing",
+ "otel.kind" = "client",
+ "rpc.service" = "configuration"
+ );
+ let mut client = state.query_routing_client.clone();
+ client
+ .get_active_routing_configuration(google::protobuf::Empty::default())
+ .instrument(span)
+ .await?
+ .into_inner()
+ .configuration
+ }
+ }
+ .ok_or_else(|| anyhow::anyhow!("no routing configuration available"))?;
+
+ trace!(tx_tp = ?payload.tx_tp, "finding all rules from configuration");
+ let set: HashSet<_> = routing
+ .messages
+ .iter()
+ .filter(|msg| msg.tx_tp == payload.tx_tp)
+ .flat_map(|msg| &msg.typologies)
+ .flat_map(|typ| &typ.rules)
+ .map(|rule| (&rule.id, rule.version()))
+ .collect();
+
+ let futs = set.into_iter().map(|value| {
+ publish::to_rule(value, Arc::clone(&state), payload.clone(), routing.clone())
+ });
+
+ futures_util::future::join_all(futs).await;
+ }
+ None => {
+ warn!("transaction is empty - proceeding with ack");
+ }
+ }
+
+ let span = info_span!("nats.ack");
+ message
+ .ack()
+ .instrument(span)
+ .await
+ .map_err(|_| anyhow::anyhow!("ack error"))?;
+
+ Ok(())
+}