diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-12 21:05:07 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-12 21:05:07 +0200 |
commit | b6924a50c9ec49e1b2b0d286abbbe608410af87d (patch) | |
tree | 9c5cf583fedfbb585985ac829bbdfdadce1571fe | |
parent | d75b5fc9c0497f56e6b8602d8ff8991bfaeff18c (diff) | |
download | warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.tar.bz2 warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.zip |
feat(router): get config
-rw-r--r-- | Cargo.lock | 221 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | crates/router/Cargo.toml | 44 | ||||
-rw-r--r-- | crates/router/router.toml | 0 | ||||
-rw-r--r-- | crates/router/src/cnfg.rs | 32 | ||||
-rw-r--r-- | crates/router/src/main.rs | 62 | ||||
-rw-r--r-- | crates/router/src/processor.rs | 108 | ||||
-rw-r--r-- | crates/router/src/processor/grpc.rs | 27 | ||||
-rw-r--r-- | crates/router/src/processor/load.rs | 51 | ||||
-rw-r--r-- | crates/router/src/processor/publish.rs | 51 | ||||
-rw-r--r-- | crates/router/src/processor/reload.rs | 58 | ||||
-rw-r--r-- | crates/router/src/processor/route.rs | 91 | ||||
-rw-r--r-- | crates/router/src/state.rs | 55 | ||||
-rw-r--r-- | crates/warden/src/server/routes/processor/pacs002.rs | 1 | ||||
-rw-r--r-- | crates/warden/src/server/routes/processor/pacs008.rs | 1 | ||||
-rw-r--r-- | lib/warden-core/Cargo.toml | 2 | ||||
-rw-r--r-- | lib/warden-core/build.rs | 10 | ||||
-rw-r--r-- | proto/warden_message.proto | 6 |
18 files changed, 817 insertions, 4 deletions
@@ -116,6 +116,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] +name = "async-lock" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] name = "async-nats" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -544,6 +555,15 @@ dependencies = [ ] [[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] name = "crossbeam-epoch" version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -775,6 +795,16 @@ dependencies = [ ] [[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + +[[package]] name = "fastrand" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -935,6 +965,20 @@ dependencies = [ ] [[package]] +name = "generator" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d18470a76cb7f8ff746cf1f7470914f900252ec36bbc40b569d74b1258446827" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows", +] + +[[package]] name = "generic-array" version = "0.14.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1437,6 +1481,19 @@ dependencies = [ ] [[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + +[[package]] name = "lru-slab" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1550,6 +1607,28 @@ dependencies = [ ] [[package]] +name = "moka" +version = "0.12.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "event-listener", + "futures-util", + "loom", + "parking_lot", + "portable-atomic", + "rustc_version", + "smallvec", + "tagptr", + "thiserror 1.0.69", + "uuid", +] + +[[package]] name = "multimap" version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2524,6 +2603,12 @@ dependencies = [ ] [[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + +[[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3050,6 +3135,12 @@ dependencies = [ ] [[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + +[[package]] name = "tempfile" version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3917,6 +4008,31 @@ dependencies = [ ] [[package]] +name = "warden-router" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-nats", + "bytes", + "clap", + "config", + "futures-util", + "moka", + "opentelemetry", + "opentelemetry-semantic-conventions", + "prost 0.14.1", + "serde", + "serde_json", + "tokio", + "tonic 0.14.0", + "tracing", + "tracing-opentelemetry", + "uuid", + "warden-core", + "warden-stack", +] + +[[package]] name = "warden-stack" version = "0.1.0" dependencies = [ @@ -4116,12 +4232,108 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-link", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core", +] + +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "windows-link" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" [[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core", + "windows-link", +] + +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link", +] + +[[package]] name = "windows-sys" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4206,6 +4418,15 @@ dependencies = [ ] [[package]] +name = "windows-threading" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link", +] + +[[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -15,6 +15,7 @@ axum = "0.8.4" bon = "3.4.0" clap = "4.5.43" config = { version = "0.15.13", default-features = false } +futures-util = { version = "0.3.31", default-features = false } metrics = { version = "0.24.2", default-features = false } metrics-exporter-prometheus = { version = "0.17.2", default-features = false } opentelemetry = { version = "0.30.0", default-features = false } diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml new file mode 100644 index 0000000..0b7b232 --- /dev/null +++ b/crates/router/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "warden-router" +version = "0.1.0" +edition = "2024" +license.workspace = true +homepage.workspace = true +documentation.workspace = true +description.workspace = true + +[dependencies] +anyhow.workspace = true +async-nats.workspace = true +bytes = "1.10.1" +clap = { workspace = true, features = ["derive"] } +config = { workspace = true, features = ["convert-case", "toml"] } +futures-util.workspace = true +moka = { version = "0.12.10", features = ["future"] } +opentelemetry.workspace = true +opentelemetry-semantic-conventions.workspace = true +prost.workspace = true +serde = { workspace = true, features = ["derive", "rc"] } +serde_json.workspace = true +tokio = { workspace = true, features = [ + "macros", + "rt-multi-thread", + "signal", +] } +tonic.workspace = true +tracing.workspace = true +tracing-opentelemetry.workspace = true +uuid = { workspace = true, features = ["v7"] } +warden-core = { workspace = true, features = [ + "message", + "configuration", + "serde-time" +] } + +[dependencies.warden-stack] +workspace = true +features = [ + "nats-jetstream", + "opentelemetry-tonic", + "tracing-loki", +] diff --git a/crates/router/router.toml b/crates/router/router.toml new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/crates/router/router.toml diff --git a/crates/router/src/cnfg.rs b/crates/router/src/cnfg.rs new file mode 100644 index 0000000..3f6e563 --- /dev/null +++ b/crates/router/src/cnfg.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use serde::Deserialize; + +pub const CACHE_KEY: i32 = 0; + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct LocalConfig { + pub config_endpoint: Arc<str>, + pub nats: Nats, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct Nats { + #[serde(rename = "stream-name")] + pub name: Arc<str>, + pub subjects: Arc<[String]>, + pub destination_prefix: Arc<str>, + pub max_messages: i64, + pub durable_name: Arc<str>, + pub config: ConfigNats, +} + +#[derive(Deserialize, Clone)] +pub struct ConfigNats { + #[serde(rename = "stream")] + pub stream: Arc<str>, + #[serde(rename = "reload-subject")] + pub reload_subject: Arc<str>, +} diff --git a/crates/router/src/main.rs b/crates/router/src/main.rs new file mode 100644 index 0000000..cc4c927 --- /dev/null +++ b/crates/router/src/main.rs @@ -0,0 +1,62 @@ +mod cnfg; +mod processor; +mod state; + +use anyhow::Result; +use clap::Parser; +use tracing::error; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +/// warden-router +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let config = include_str!("../router.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let mut services = Services::builder() + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .build(); + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let services = state::Services { + jetstream, + }; + + processor::serve(services, config, provider) + .await + .inspect_err(|e| error!("{e}")) +} diff --git a/crates/router/src/processor.rs b/crates/router/src/processor.rs new file mode 100644 index 0000000..b8c69f3 --- /dev/null +++ b/crates/router/src/processor.rs @@ -0,0 +1,108 @@ +pub mod grpc; +mod reload; +mod route; +mod publish; +mod load; + +use std::sync::Arc; + +use anyhow::Result; +use async_nats::jetstream::{consumer::{pull, Consumer}, Context}; +use tokio::signal; +use tracing::{error, trace}; +use warden_stack::{Configuration, tracing::SdkTracerProvider}; +use futures_util::StreamExt; + +use crate::{cnfg::Nats, state::{AppHandle, AppState, Services}}; + + +pub async fn serve( + services: Services, + config: Configuration, + provider: SdkTracerProvider, +) -> anyhow::Result<()> { + let state = Arc::new(AppState::new(services, config).await?); + + tokio::select! { + _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {} + _ = shutdown_signal(provider) => {} + }; + + Ok(()) +} + + +async fn run(state: AppHandle) -> anyhow::Result<()> { + let config = Arc::clone(&state); + let (consumer, _) = tokio::join!( + get_or_create_stream(&state.services.jetstream, &state.config.nats), + load::get_routing_config(Arc::clone(&config)) + ); + + let consumer = consumer?; + + // Consume messages from the consumer + while let Some(Ok(message)) = consumer.messages().await?.next().await { + if let Err(e) = route::route(message, Arc::clone(&state)).await { + error!("{}", e.to_string()); + } + } + + Ok(()) +} + +async fn get_or_create_stream( + jetstream: &Context, + nats: &Nats, +) -> anyhow::Result<Consumer<pull::Config>> { + trace!(name = ?nats.name, "getting or creating stream"); + let stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: nats.name.to_string(), + subjects: nats.subjects.iter().map(Into::into).collect(), + max_messages: nats.max_messages, + ..Default::default() + }) + .await?; + let durable = nats.durable_name.to_string(); + // Get or create a pull-based consumer + Ok(stream + .get_or_create_consumer( + durable.as_ref(), + async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable.to_string()), + ..Default::default() + }, + ) + .await?) +} + + +async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } + let _ = provider.shutdown(); + + Ok(()) +} diff --git a/crates/router/src/processor/grpc.rs b/crates/router/src/processor/grpc.rs new file mode 100644 index 0000000..344f2a1 --- /dev/null +++ b/crates/router/src/processor/grpc.rs @@ -0,0 +1,27 @@ +pub mod interceptor { + use opentelemetry::global; + use tonic::{ + Status, + service::{Interceptor, interceptor::InterceptedService}, + transport::Channel, + }; + use tracing::Span; + use tracing_opentelemetry::OpenTelemetrySpanExt; + use warden_stack::tracing::telemetry::tonic::injector; + + pub type Intercepted = InterceptedService<Channel, MyInterceptor>; + + #[derive(Clone, Copy)] + pub struct MyInterceptor; + + impl Interceptor for MyInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + let cx = Span::current().context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut())) + }); + + Ok(request) + } + } +} diff --git a/crates/router/src/processor/load.rs b/crates/router/src/processor/load.rs new file mode 100644 index 0000000..9d3fd0d --- /dev/null +++ b/crates/router/src/processor/load.rs @@ -0,0 +1,51 @@ +use opentelemetry_semantic_conventions::attribute; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{Instrument, debug, info, info_span, instrument, warn}; +use warden_core::{configuration::routing::RoutingConfiguration, google }; + +use crate::{cnfg::CACHE_KEY, state::AppHandle}; + +#[instrument(skip(state))] +pub async fn get_routing_config(state: AppHandle) -> Option<RoutingConfiguration> { + debug!("getting routing config"); + { + let span = info_span!("local_cache.get"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "moka"); + let local_cache = state.local_cache.read().await; + if let Some(value) = local_cache.get(&CACHE_KEY).await { + return Some(value); + } + } + + let mut client = state.query_routing_client.clone(); + + let span = info_span!("get.routing.config"); + span.set_attribute(attribute::RPC_SERVICE, env!("CARGO_PKG_NAME")); + span.set_attribute("otel.kind", "client"); + + if let Ok(config) = client + .get_active_routing_configuration(google::protobuf::Empty::default()) + .instrument(span) + .await + { + debug!("fetched routing config"); + let span = info_span!("local_cache.insert"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "moka"); + if let Some(config) = config.into_inner().configuration { + debug!("updating cache"); + let local_cache = state.local_cache.write().await; + local_cache + .insert(CACHE_KEY, config.clone()) + .instrument(span) + .await; + info!("cache refreshed"); + return Some(config); + } else { + warn!("no routing config is active"); + return None; + } + } else { + warn!("no routing config is active"); + return None; + } +} diff --git a/crates/router/src/processor/publish.rs b/crates/router/src/processor/publish.rs new file mode 100644 index 0000000..16dcec8 --- /dev/null +++ b/crates/router/src/processor/publish.rs @@ -0,0 +1,51 @@ +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{Instrument, Span, debug, info, info_span, warn}; +use warden_core::{configuration::routing::RoutingConfiguration, message::Payload}; +use warden_stack::tracing::telemetry::nats::injector; + +use crate::state::AppHandle; + +pub(crate) async fn to_rule( + (rule_id, rule_version): (&String, &str), + state: AppHandle, + mut payload: Payload, + routing: RoutingConfiguration, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!( + "{}.{rule_id}.v{rule_version}", + state.config.nats.destination_prefix + ); + debug!(subject = ?subject, "publishing"); + + payload.routing = Some(routing); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + info!("published to rule"); + + Ok(()) +} diff --git a/crates/router/src/processor/reload.rs b/crates/router/src/processor/reload.rs new file mode 100644 index 0000000..c75465c --- /dev/null +++ b/crates/router/src/processor/reload.rs @@ -0,0 +1,58 @@ +use futures_util::StreamExt; +use async_nats::jetstream::consumer; +use tracing::{trace, debug, error, info}; +use uuid::Uuid; +use warden_core::configuration::ReloadEvent; + +use crate::state::AppHandle; + +pub async fn reload(state: AppHandle) -> anyhow::Result<()> { + let id = Uuid::now_v7().to_string(); + info!(durable = id, "listening for configuration changes"); + + let durable = &id; + let consumer = state + .services + .jetstream + .get_stream(state.config.nats.config.stream.to_string()) + .await? + .get_or_create_consumer( + durable, + consumer::pull::Config { + durable_name: Some(durable.to_string()), + filter_subject: state.config.nats.config.reload_subject.to_string(), + deliver_policy: consumer::DeliverPolicy::LastPerSubject, + ..Default::default() + }, + ) + .await?; + + let mut messages = consumer.messages().await?; + while let Some(value) = messages.next().await { + match value { + Ok(message) => { + trace!("got reload cache event",); + if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec()) + .map(|value| ReloadEvent::from_str_name(&value)) + { + match event { + ReloadEvent::Routing => { + let local_cache = state.local_cache.write().await; + local_cache.invalidate_all(); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + _ => { + debug!(event = ?event, "detected reload event, acknowledging"); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + } + } + } + Err(e) => { + error!("{e}") + } + } + } + + Ok(()) +} diff --git a/crates/router/src/processor/route.rs b/crates/router/src/processor/route.rs new file mode 100644 index 0000000..404c2ca --- /dev/null +++ b/crates/router/src/processor/route.rs @@ -0,0 +1,91 @@ +use anyhow::Result; +use std::{collections::HashSet, sync::Arc}; + +use opentelemetry::global; +use prost::Message; +use tracing::{info_span, instrument, trace, trace_span, warn, Instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{google, message::Payload}; +use warden_stack::tracing::telemetry::nats; + +use crate::{cnfg::CACHE_KEY, processor::publish, state::AppHandle}; + +#[instrument(skip(message, state), err(Debug), fields(msg_id))] +pub async fn route(message: async_nats::jetstream::Message, state: AppHandle) -> Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&nats::extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let payload: Payload = Message::decode(message.payload.as_ref())?; + + match payload.transaction { + Some(ref transaction) => { + let msg_id = match transaction { + warden_core::message::payload::Transaction::Pacs008(pacs008_document) => { + &pacs008_document.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id + } + warden_core::message::payload::Transaction::Pacs002(pacs002_document) => { + &pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id + } + }; + span.record("msg_id", msg_id); + + let routing = { + let local_cache = state.local_cache.read().await; + local_cache.get(&CACHE_KEY).await + }; + + let routing = match routing { + Some(local) => Some(local), + None => { + let span = trace_span!( + "get.active.routing", + "otel.kind" = "client", + "rpc.service" = "configuration" + ); + let mut client = state.query_routing_client.clone(); + client + .get_active_routing_configuration(google::protobuf::Empty::default()) + .instrument(span) + .await? + .into_inner() + .configuration + } + } + .ok_or_else(|| anyhow::anyhow!("no routing configuration available"))?; + + trace!(tx_tp = ?payload.tx_tp, "finding all rules from configuration"); + let set: HashSet<_> = routing + .messages + .iter() + .filter(|msg| msg.tx_tp == payload.tx_tp) + .flat_map(|msg| &msg.typologies) + .flat_map(|typ| &typ.rules) + .map(|rule| (&rule.id, rule.version())) + .collect(); + + let futs = set.into_iter().map(|value| { + publish::to_rule(value, Arc::clone(&state), payload.clone(), routing.clone()) + }); + + futures_util::future::join_all(futs).await; + } + None => { + warn!("transaction is empty - proceeding with ack"); + } + } + + let span = info_span!("nats.ack"); + message + .ack() + .instrument(span) + .await + .map_err(|_| anyhow::anyhow!("ack error"))?; + + Ok(()) +} diff --git a/crates/router/src/state.rs b/crates/router/src/state.rs new file mode 100644 index 0000000..e01629e --- /dev/null +++ b/crates/router/src/state.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; + +use async_nats::jetstream::Context; +use moka::future::Cache; +use tokio::sync::RwLock; +use tonic::transport::Endpoint; +use tracing::error; +use warden_core::configuration::routing::{ + RoutingConfiguration, query_routing_client::QueryRoutingClient, +}; +use warden_stack::{Configuration}; + +use crate::{ + cnfg::LocalConfig, + processor::grpc::interceptor::{Intercepted, MyInterceptor}, +}; + +#[derive(Clone)] +pub struct Services { + pub jetstream: Context, +} + +pub type AppHandle = Arc<AppState>; + +#[derive(Clone)] +pub struct AppState { + pub services: Services, + pub local_cache: Arc<RwLock<Cache<i32, RoutingConfiguration>>>, + pub config: LocalConfig, + pub query_routing_client: QueryRoutingClient<Intercepted>, +} + +impl AppState { + pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> { + let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + let channel = Endpoint::new(config.config_endpoint.to_string())? + .connect() + .await + .inspect_err(|e| { + error!( + endpoint = ?config.config_endpoint, + "could not connect to config service: {e}", + ) + })?; + + let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor); + + Ok(Self { + services, + config, + local_cache: Arc::new(RwLock::new(Cache::builder().build())), + query_routing_client, + }) + } +} diff --git a/crates/warden/src/server/routes/processor/pacs002.rs b/crates/warden/src/server/routes/processor/pacs002.rs index 503939c..fbfda3c 100644 --- a/crates/warden/src/server/routes/processor/pacs002.rs +++ b/crates/warden/src/server/routes/processor/pacs002.rs @@ -189,6 +189,7 @@ pub async fn post_pacs002( transaction: Some(warden_core::message::payload::Transaction::Pacs002( request.clone(), )), + ..Default::default() }; publish_message(&state, payload, msg_id).await?; diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs index b7f495e..8d94ca9 100644 --- a/crates/warden/src/server/routes/processor/pacs008.rs +++ b/crates/warden/src/server/routes/processor/pacs008.rs @@ -175,6 +175,7 @@ pub(super) async fn post_pacs008( transaction.clone(), )), data_cache: Some(data_cache), + ..Default::default() }; publish_message(&state, payload, msg_id).await?; diff --git a/lib/warden-core/Cargo.toml b/lib/warden-core/Cargo.toml index dc9e83e..67ea279 100644 --- a/lib/warden-core/Cargo.toml +++ b/lib/warden-core/Cargo.toml @@ -24,7 +24,7 @@ utoipa = { workspace = true, optional = true } [features] default = [] configuration = ["dep:prost", "dep:tonic", "dep:tonic-types", "dep:tonic-prost"] -message = ["dep:prost", "dep:tonic", "dep:tonic-types", "dep:tonic-prost"] +message = ["configuration"] pseudonyms = ["dep:prost", "dep:tonic", "dep:tonic-types", "dep:tonic-prost"] serde = ["dep:serde", "serde/derive", "dep:serde_json"] serde-time = [ diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs index 3992cd8..83b0407 100644 --- a/lib/warden-core/build.rs +++ b/lib/warden-core/build.rs @@ -20,10 +20,18 @@ impl Entity { #[cfg(feature = "configuration")] fn configuration_protos() -> Vec<&'static str> { + if cfg!(feature = "message") { + vec![ + "proto/configuration/reload_event.proto", + ] + } else { vec![ - "proto/configuration/routing.proto", + "proto/configuration/routing.proto", "proto/configuration/reload_event.proto", ] + + } + } #[cfg(feature = "pseudonyms")] diff --git a/proto/warden_message.proto b/proto/warden_message.proto index d9c0cd4..706139c 100644 --- a/proto/warden_message.proto +++ b/proto/warden_message.proto @@ -2,9 +2,10 @@ syntax = "proto3"; package message; -import "proto/iso20022/pacs_008_001_12.proto"; -import "proto/iso20022/pacs_002_001_12.proto"; +import "configuration/routing.proto"; import "google/protobuf/timestamp.proto"; +import "proto/iso20022/pacs_002_001_12.proto"; +import "proto/iso20022/pacs_008_001_12.proto"; message Payload { oneof transaction { @@ -13,6 +14,7 @@ message Payload { } DataCache data_cache = 3; string tx_tp = 4; + configuration.routing.RoutingConfiguration routing = 5; } message DataCache { |