aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-12 21:05:07 +0200
committerrtkay123 <dev@kanjala.com>2025-08-12 21:05:07 +0200
commitb6924a50c9ec49e1b2b0d286abbbe608410af87d (patch)
tree9c5cf583fedfbb585985ac829bbdfdadce1571fe
parentd75b5fc9c0497f56e6b8602d8ff8991bfaeff18c (diff)
downloadwarden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.tar.bz2
warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.zip
feat(router): get config
-rw-r--r--Cargo.lock221
-rw-r--r--Cargo.toml1
-rw-r--r--crates/router/Cargo.toml44
-rw-r--r--crates/router/router.toml0
-rw-r--r--crates/router/src/cnfg.rs32
-rw-r--r--crates/router/src/main.rs62
-rw-r--r--crates/router/src/processor.rs108
-rw-r--r--crates/router/src/processor/grpc.rs27
-rw-r--r--crates/router/src/processor/load.rs51
-rw-r--r--crates/router/src/processor/publish.rs51
-rw-r--r--crates/router/src/processor/reload.rs58
-rw-r--r--crates/router/src/processor/route.rs91
-rw-r--r--crates/router/src/state.rs55
-rw-r--r--crates/warden/src/server/routes/processor/pacs002.rs1
-rw-r--r--crates/warden/src/server/routes/processor/pacs008.rs1
-rw-r--r--lib/warden-core/Cargo.toml2
-rw-r--r--lib/warden-core/build.rs10
-rw-r--r--proto/warden_message.proto6
18 files changed, 817 insertions, 4 deletions
diff --git a/Cargo.lock b/Cargo.lock
index cdeea13..682dca1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -116,6 +116,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
+name = "async-lock"
+version = "3.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc"
+dependencies = [
+ "event-listener",
+ "event-listener-strategy",
+ "pin-project-lite",
+]
+
+[[package]]
name = "async-nats"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -544,6 +555,15 @@ dependencies = [
]
[[package]]
+name = "crossbeam-channel"
+version = "0.5.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
+dependencies = [
+ "crossbeam-utils",
+]
+
+[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -775,6 +795,16 @@ dependencies = [
]
[[package]]
+name = "event-listener-strategy"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93"
+dependencies = [
+ "event-listener",
+ "pin-project-lite",
+]
+
+[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -935,6 +965,20 @@ dependencies = [
]
[[package]]
+name = "generator"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d18470a76cb7f8ff746cf1f7470914f900252ec36bbc40b569d74b1258446827"
+dependencies = [
+ "cc",
+ "cfg-if",
+ "libc",
+ "log",
+ "rustversion",
+ "windows",
+]
+
+[[package]]
name = "generic-array"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1437,6 +1481,19 @@ dependencies = [
]
[[package]]
+name = "loom"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca"
+dependencies = [
+ "cfg-if",
+ "generator",
+ "scoped-tls",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
name = "lru-slab"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1550,6 +1607,28 @@ dependencies = [
]
[[package]]
+name = "moka"
+version = "0.12.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926"
+dependencies = [
+ "async-lock",
+ "crossbeam-channel",
+ "crossbeam-epoch",
+ "crossbeam-utils",
+ "event-listener",
+ "futures-util",
+ "loom",
+ "parking_lot",
+ "portable-atomic",
+ "rustc_version",
+ "smallvec",
+ "tagptr",
+ "thiserror 1.0.69",
+ "uuid",
+]
+
+[[package]]
name = "multimap"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2524,6 +2603,12 @@ dependencies = [
]
[[package]]
+name = "scoped-tls"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
+
+[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3050,6 +3135,12 @@ dependencies = [
]
[[package]]
+name = "tagptr"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
+
+[[package]]
name = "tempfile"
version = "3.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3917,6 +4008,31 @@ dependencies = [
]
[[package]]
+name = "warden-router"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "async-nats",
+ "bytes",
+ "clap",
+ "config",
+ "futures-util",
+ "moka",
+ "opentelemetry",
+ "opentelemetry-semantic-conventions",
+ "prost 0.14.1",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tonic 0.14.0",
+ "tracing",
+ "tracing-opentelemetry",
+ "uuid",
+ "warden-core",
+ "warden-stack",
+]
+
+[[package]]
name = "warden-stack"
version = "0.1.0"
dependencies = [
@@ -4116,12 +4232,108 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
+name = "windows"
+version = "0.61.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893"
+dependencies = [
+ "windows-collections",
+ "windows-core",
+ "windows-future",
+ "windows-link",
+ "windows-numerics",
+]
+
+[[package]]
+name = "windows-collections"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8"
+dependencies = [
+ "windows-core",
+]
+
+[[package]]
+name = "windows-core"
+version = "0.61.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
+dependencies = [
+ "windows-implement",
+ "windows-interface",
+ "windows-link",
+ "windows-result",
+ "windows-strings",
+]
+
+[[package]]
+name = "windows-future"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e"
+dependencies = [
+ "windows-core",
+ "windows-link",
+ "windows-threading",
+]
+
+[[package]]
+name = "windows-implement"
+version = "0.60.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "windows-interface"
+version = "0.59.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "windows-link"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
[[package]]
+name = "windows-numerics"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1"
+dependencies = [
+ "windows-core",
+ "windows-link",
+]
+
+[[package]]
+name = "windows-result"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
+dependencies = [
+ "windows-link",
+]
+
+[[package]]
+name = "windows-strings"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
+dependencies = [
+ "windows-link",
+]
+
+[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4206,6 +4418,15 @@ dependencies = [
]
[[package]]
+name = "windows-threading"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6"
+dependencies = [
+ "windows-link",
+]
+
+[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 8d1ee8d..fee837d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,6 +15,7 @@ axum = "0.8.4"
bon = "3.4.0"
clap = "4.5.43"
config = { version = "0.15.13", default-features = false }
+futures-util = { version = "0.3.31", default-features = false }
metrics = { version = "0.24.2", default-features = false }
metrics-exporter-prometheus = { version = "0.17.2", default-features = false }
opentelemetry = { version = "0.30.0", default-features = false }
diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml
new file mode 100644
index 0000000..0b7b232
--- /dev/null
+++ b/crates/router/Cargo.toml
@@ -0,0 +1,44 @@
+[package]
+name = "warden-router"
+version = "0.1.0"
+edition = "2024"
+license.workspace = true
+homepage.workspace = true
+documentation.workspace = true
+description.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+async-nats.workspace = true
+bytes = "1.10.1"
+clap = { workspace = true, features = ["derive"] }
+config = { workspace = true, features = ["convert-case", "toml"] }
+futures-util.workspace = true
+moka = { version = "0.12.10", features = ["future"] }
+opentelemetry.workspace = true
+opentelemetry-semantic-conventions.workspace = true
+prost.workspace = true
+serde = { workspace = true, features = ["derive", "rc"] }
+serde_json.workspace = true
+tokio = { workspace = true, features = [
+ "macros",
+ "rt-multi-thread",
+ "signal",
+] }
+tonic.workspace = true
+tracing.workspace = true
+tracing-opentelemetry.workspace = true
+uuid = { workspace = true, features = ["v7"] }
+warden-core = { workspace = true, features = [
+ "message",
+ "configuration",
+ "serde-time"
+] }
+
+[dependencies.warden-stack]
+workspace = true
+features = [
+ "nats-jetstream",
+ "opentelemetry-tonic",
+ "tracing-loki",
+]
diff --git a/crates/router/router.toml b/crates/router/router.toml
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/crates/router/router.toml
diff --git a/crates/router/src/cnfg.rs b/crates/router/src/cnfg.rs
new file mode 100644
index 0000000..3f6e563
--- /dev/null
+++ b/crates/router/src/cnfg.rs
@@ -0,0 +1,32 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+pub const CACHE_KEY: i32 = 0;
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct LocalConfig {
+ pub config_endpoint: Arc<str>,
+ pub nats: Nats,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct Nats {
+ #[serde(rename = "stream-name")]
+ pub name: Arc<str>,
+ pub subjects: Arc<[String]>,
+ pub destination_prefix: Arc<str>,
+ pub max_messages: i64,
+ pub durable_name: Arc<str>,
+ pub config: ConfigNats,
+}
+
+#[derive(Deserialize, Clone)]
+pub struct ConfigNats {
+ #[serde(rename = "stream")]
+ pub stream: Arc<str>,
+ #[serde(rename = "reload-subject")]
+ pub reload_subject: Arc<str>,
+}
diff --git a/crates/router/src/main.rs b/crates/router/src/main.rs
new file mode 100644
index 0000000..cc4c927
--- /dev/null
+++ b/crates/router/src/main.rs
@@ -0,0 +1,62 @@
+mod cnfg;
+mod processor;
+mod state;
+
+use anyhow::Result;
+use clap::Parser;
+use tracing::error;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+
+/// warden-router
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../router.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .nats_jetstream(&config.nats)
+ .await
+ .inspect_err(|e| error!("nats: {e}"))?
+ .build();
+
+ let jetstream = services
+ .jetstream
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
+
+ let services = state::Services {
+ jetstream,
+ };
+
+ processor::serve(services, config, provider)
+ .await
+ .inspect_err(|e| error!("{e}"))
+}
diff --git a/crates/router/src/processor.rs b/crates/router/src/processor.rs
new file mode 100644
index 0000000..b8c69f3
--- /dev/null
+++ b/crates/router/src/processor.rs
@@ -0,0 +1,108 @@
+pub mod grpc;
+mod reload;
+mod route;
+mod publish;
+mod load;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use async_nats::jetstream::{consumer::{pull, Consumer}, Context};
+use tokio::signal;
+use tracing::{error, trace};
+use warden_stack::{Configuration, tracing::SdkTracerProvider};
+use futures_util::StreamExt;
+
+use crate::{cnfg::Nats, state::{AppHandle, AppState, Services}};
+
+
+pub async fn serve(
+ services: Services,
+ config: Configuration,
+ provider: SdkTracerProvider,
+) -> anyhow::Result<()> {
+ let state = Arc::new(AppState::new(services, config).await?);
+
+ tokio::select! {
+ _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+
+ Ok(())
+}
+
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let config = Arc::clone(&state);
+ let (consumer, _) = tokio::join!(
+ get_or_create_stream(&state.services.jetstream, &state.config.nats),
+ load::get_routing_config(Arc::clone(&config))
+ );
+
+ let consumer = consumer?;
+
+ // Consume messages from the consumer
+ while let Some(Ok(message)) = consumer.messages().await?.next().await {
+ if let Err(e) = route::route(message, Arc::clone(&state)).await {
+ error!("{}", e.to_string());
+ }
+ }
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &Nats,
+) -> anyhow::Result<Consumer<pull::Config>> {
+ trace!(name = ?nats.name, "getting or creating stream");
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name: nats.name.to_string(),
+ subjects: nats.subjects.iter().map(Into::into).collect(),
+ max_messages: nats.max_messages,
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ Ok(stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?)
+}
+
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}
diff --git a/crates/router/src/processor/grpc.rs b/crates/router/src/processor/grpc.rs
new file mode 100644
index 0000000..344f2a1
--- /dev/null
+++ b/crates/router/src/processor/grpc.rs
@@ -0,0 +1,27 @@
+pub mod interceptor {
+ use opentelemetry::global;
+ use tonic::{
+ Status,
+ service::{Interceptor, interceptor::InterceptedService},
+ transport::Channel,
+ };
+ use tracing::Span;
+ use tracing_opentelemetry::OpenTelemetrySpanExt;
+ use warden_stack::tracing::telemetry::tonic::injector;
+
+ pub type Intercepted = InterceptedService<Channel, MyInterceptor>;
+
+ #[derive(Clone, Copy)]
+ pub struct MyInterceptor;
+
+ impl Interceptor for MyInterceptor {
+ fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
+ let cx = Span::current().context();
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut()))
+ });
+
+ Ok(request)
+ }
+ }
+}
diff --git a/crates/router/src/processor/load.rs b/crates/router/src/processor/load.rs
new file mode 100644
index 0000000..9d3fd0d
--- /dev/null
+++ b/crates/router/src/processor/load.rs
@@ -0,0 +1,51 @@
+use opentelemetry_semantic_conventions::attribute;
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use tracing::{Instrument, debug, info, info_span, instrument, warn};
+use warden_core::{configuration::routing::RoutingConfiguration, google };
+
+use crate::{cnfg::CACHE_KEY, state::AppHandle};
+
+#[instrument(skip(state))]
+pub async fn get_routing_config(state: AppHandle) -> Option<RoutingConfiguration> {
+ debug!("getting routing config");
+ {
+ let span = info_span!("local_cache.get");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "moka");
+ let local_cache = state.local_cache.read().await;
+ if let Some(value) = local_cache.get(&CACHE_KEY).await {
+ return Some(value);
+ }
+ }
+
+ let mut client = state.query_routing_client.clone();
+
+ let span = info_span!("get.routing.config");
+ span.set_attribute(attribute::RPC_SERVICE, env!("CARGO_PKG_NAME"));
+ span.set_attribute("otel.kind", "client");
+
+ if let Ok(config) = client
+ .get_active_routing_configuration(google::protobuf::Empty::default())
+ .instrument(span)
+ .await
+ {
+ debug!("fetched routing config");
+ let span = info_span!("local_cache.insert");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "moka");
+ if let Some(config) = config.into_inner().configuration {
+ debug!("updating cache");
+ let local_cache = state.local_cache.write().await;
+ local_cache
+ .insert(CACHE_KEY, config.clone())
+ .instrument(span)
+ .await;
+ info!("cache refreshed");
+ return Some(config);
+ } else {
+ warn!("no routing config is active");
+ return None;
+ }
+ } else {
+ warn!("no routing config is active");
+ return None;
+ }
+}
diff --git a/crates/router/src/processor/publish.rs b/crates/router/src/processor/publish.rs
new file mode 100644
index 0000000..16dcec8
--- /dev/null
+++ b/crates/router/src/processor/publish.rs
@@ -0,0 +1,51 @@
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use tracing::{Instrument, Span, debug, info, info_span, warn};
+use warden_core::{configuration::routing::RoutingConfiguration, message::Payload};
+use warden_stack::tracing::telemetry::nats::injector;
+
+use crate::state::AppHandle;
+
+pub(crate) async fn to_rule(
+ (rule_id, rule_version): (&String, &str),
+ state: AppHandle,
+ mut payload: Payload,
+ routing: RoutingConfiguration,
+) -> anyhow::Result<()> {
+ // send transaction to next with nats
+ let subject = format!(
+ "{}.{rule_id}.v{rule_version}",
+ state.config.nats.destination_prefix
+ );
+ debug!(subject = ?subject, "publishing");
+
+ payload.routing = Some(routing);
+
+ let payload = prost::Message::encode_to_vec(&payload);
+
+ let mut headers = async_nats::HeaderMap::new();
+
+ let cx = Span::current().context();
+
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers))
+ });
+
+ let span = info_span!("nats.publish");
+ span.set_attribute(
+ attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
+ subject.to_string(),
+ );
+ state
+ .services
+ .jetstream
+ .publish_with_headers(subject.clone(), headers, payload.into())
+ .instrument(span)
+ .await
+ .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?;
+
+ info!("published to rule");
+
+ Ok(())
+}
diff --git a/crates/router/src/processor/reload.rs b/crates/router/src/processor/reload.rs
new file mode 100644
index 0000000..c75465c
--- /dev/null
+++ b/crates/router/src/processor/reload.rs
@@ -0,0 +1,58 @@
+use futures_util::StreamExt;
+use async_nats::jetstream::consumer;
+use tracing::{trace, debug, error, info};
+use uuid::Uuid;
+use warden_core::configuration::ReloadEvent;
+
+use crate::state::AppHandle;
+
+pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
+ let id = Uuid::now_v7().to_string();
+ info!(durable = id, "listening for configuration changes");
+
+ let durable = &id;
+ let consumer = state
+ .services
+ .jetstream
+ .get_stream(state.config.nats.config.stream.to_string())
+ .await?
+ .get_or_create_consumer(
+ durable,
+ consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ filter_subject: state.config.nats.config.reload_subject.to_string(),
+ deliver_policy: consumer::DeliverPolicy::LastPerSubject,
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ let mut messages = consumer.messages().await?;
+ while let Some(value) = messages.next().await {
+ match value {
+ Ok(message) => {
+ trace!("got reload cache event",);
+ if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec())
+ .map(|value| ReloadEvent::from_str_name(&value))
+ {
+ match event {
+ ReloadEvent::Routing => {
+ let local_cache = state.local_cache.write().await;
+ local_cache.invalidate_all();
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ _ => {
+ debug!(event = ?event, "detected reload event, acknowledging");
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("{e}")
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/crates/router/src/processor/route.rs b/crates/router/src/processor/route.rs
new file mode 100644
index 0000000..404c2ca
--- /dev/null
+++ b/crates/router/src/processor/route.rs
@@ -0,0 +1,91 @@
+use anyhow::Result;
+use std::{collections::HashSet, sync::Arc};
+
+use opentelemetry::global;
+use prost::Message;
+use tracing::{info_span, instrument, trace, trace_span, warn, Instrument, Span};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::{google, message::Payload};
+use warden_stack::tracing::telemetry::nats;
+
+use crate::{cnfg::CACHE_KEY, processor::publish, state::AppHandle};
+
+#[instrument(skip(message, state), err(Debug), fields(msg_id))]
+pub async fn route(message: async_nats::jetstream::Message, state: AppHandle) -> Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&nats::extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let payload: Payload = Message::decode(message.payload.as_ref())?;
+
+ match payload.transaction {
+ Some(ref transaction) => {
+ let msg_id = match transaction {
+ warden_core::message::payload::Transaction::Pacs008(pacs008_document) => {
+ &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id
+ }
+ warden_core::message::payload::Transaction::Pacs002(pacs002_document) => {
+ &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id
+ }
+ };
+ span.record("msg_id", msg_id);
+
+ let routing = {
+ let local_cache = state.local_cache.read().await;
+ local_cache.get(&CACHE_KEY).await
+ };
+
+ let routing = match routing {
+ Some(local) => Some(local),
+ None => {
+ let span = trace_span!(
+ "get.active.routing",
+ "otel.kind" = "client",
+ "rpc.service" = "configuration"
+ );
+ let mut client = state.query_routing_client.clone();
+ client
+ .get_active_routing_configuration(google::protobuf::Empty::default())
+ .instrument(span)
+ .await?
+ .into_inner()
+ .configuration
+ }
+ }
+ .ok_or_else(|| anyhow::anyhow!("no routing configuration available"))?;
+
+ trace!(tx_tp = ?payload.tx_tp, "finding all rules from configuration");
+ let set: HashSet<_> = routing
+ .messages
+ .iter()
+ .filter(|msg| msg.tx_tp == payload.tx_tp)
+ .flat_map(|msg| &msg.typologies)
+ .flat_map(|typ| &typ.rules)
+ .map(|rule| (&rule.id, rule.version()))
+ .collect();
+
+ let futs = set.into_iter().map(|value| {
+ publish::to_rule(value, Arc::clone(&state), payload.clone(), routing.clone())
+ });
+
+ futures_util::future::join_all(futs).await;
+ }
+ None => {
+ warn!("transaction is empty - proceeding with ack");
+ }
+ }
+
+ let span = info_span!("nats.ack");
+ message
+ .ack()
+ .instrument(span)
+ .await
+ .map_err(|_| anyhow::anyhow!("ack error"))?;
+
+ Ok(())
+}
diff --git a/crates/router/src/state.rs b/crates/router/src/state.rs
new file mode 100644
index 0000000..e01629e
--- /dev/null
+++ b/crates/router/src/state.rs
@@ -0,0 +1,55 @@
+use std::sync::Arc;
+
+use async_nats::jetstream::Context;
+use moka::future::Cache;
+use tokio::sync::RwLock;
+use tonic::transport::Endpoint;
+use tracing::error;
+use warden_core::configuration::routing::{
+ RoutingConfiguration, query_routing_client::QueryRoutingClient,
+};
+use warden_stack::{Configuration};
+
+use crate::{
+ cnfg::LocalConfig,
+ processor::grpc::interceptor::{Intercepted, MyInterceptor},
+};
+
+#[derive(Clone)]
+pub struct Services {
+ pub jetstream: Context,
+}
+
+pub type AppHandle = Arc<AppState>;
+
+#[derive(Clone)]
+pub struct AppState {
+ pub services: Services,
+ pub local_cache: Arc<RwLock<Cache<i32, RoutingConfiguration>>>,
+ pub config: LocalConfig,
+ pub query_routing_client: QueryRoutingClient<Intercepted>,
+}
+
+impl AppState {
+ pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> {
+ let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?;
+ let channel = Endpoint::new(config.config_endpoint.to_string())?
+ .connect()
+ .await
+ .inspect_err(|e| {
+ error!(
+ endpoint = ?config.config_endpoint,
+ "could not connect to config service: {e}",
+ )
+ })?;
+
+ let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor);
+
+ Ok(Self {
+ services,
+ config,
+ local_cache: Arc::new(RwLock::new(Cache::builder().build())),
+ query_routing_client,
+ })
+ }
+}
diff --git a/crates/warden/src/server/routes/processor/pacs002.rs b/crates/warden/src/server/routes/processor/pacs002.rs
index 503939c..fbfda3c 100644
--- a/crates/warden/src/server/routes/processor/pacs002.rs
+++ b/crates/warden/src/server/routes/processor/pacs002.rs
@@ -189,6 +189,7 @@ pub async fn post_pacs002(
transaction: Some(warden_core::message::payload::Transaction::Pacs002(
request.clone(),
)),
+ ..Default::default()
};
publish_message(&state, payload, msg_id).await?;
diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs
index b7f495e..8d94ca9 100644
--- a/crates/warden/src/server/routes/processor/pacs008.rs
+++ b/crates/warden/src/server/routes/processor/pacs008.rs
@@ -175,6 +175,7 @@ pub(super) async fn post_pacs008(
transaction.clone(),
)),
data_cache: Some(data_cache),
+ ..Default::default()
};
publish_message(&state, payload, msg_id).await?;
diff --git a/lib/warden-core/Cargo.toml b/lib/warden-core/Cargo.toml
index dc9e83e..67ea279 100644
--- a/lib/warden-core/Cargo.toml
+++ b/lib/warden-core/Cargo.toml
@@ -24,7 +24,7 @@ utoipa = { workspace = true, optional = true }
[features]
default = []
configuration = ["dep:prost", "dep:tonic", "dep:tonic-types", "dep:tonic-prost"]
-message = ["dep:prost", "dep:tonic", "dep:tonic-types", "dep:tonic-prost"]
+message = ["configuration"]
pseudonyms = ["dep:prost", "dep:tonic", "dep:tonic-types", "dep:tonic-prost"]
serde = ["dep:serde", "serde/derive", "dep:serde_json"]
serde-time = [
diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs
index 3992cd8..83b0407 100644
--- a/lib/warden-core/build.rs
+++ b/lib/warden-core/build.rs
@@ -20,10 +20,18 @@ impl Entity {
#[cfg(feature = "configuration")]
fn configuration_protos() -> Vec<&'static str> {
+ if cfg!(feature = "message") {
+ vec![
+ "proto/configuration/reload_event.proto",
+ ]
+ } else {
vec![
- "proto/configuration/routing.proto",
+ "proto/configuration/routing.proto",
"proto/configuration/reload_event.proto",
]
+
+ }
+
}
#[cfg(feature = "pseudonyms")]
diff --git a/proto/warden_message.proto b/proto/warden_message.proto
index d9c0cd4..706139c 100644
--- a/proto/warden_message.proto
+++ b/proto/warden_message.proto
@@ -2,9 +2,10 @@ syntax = "proto3";
package message;
-import "proto/iso20022/pacs_008_001_12.proto";
-import "proto/iso20022/pacs_002_001_12.proto";
+import "configuration/routing.proto";
import "google/protobuf/timestamp.proto";
+import "proto/iso20022/pacs_002_001_12.proto";
+import "proto/iso20022/pacs_008_001_12.proto";
message Payload {
oneof transaction {
@@ -13,6 +14,7 @@ message Payload {
}
DataCache data_cache = 3;
string tx_tp = 4;
+ configuration.routing.RoutingConfiguration routing = 5;
}
message DataCache {