diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-10 18:54:42 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-10 18:54:42 +0200 |
commit | 2b34f0ada4ffac6b38be80e4070c73402b92af48 (patch) | |
tree | 3ec0d623748b224c81757165169b910f692a398b | |
parent | a9aef1863ff0fe0422eee403f85644fc76952e80 (diff) | |
download | warden-2b34f0ada4ffac6b38be80e4070c73402b92af48.tar.bz2 warden-2b34f0ada4ffac6b38be80e4070c73402b92af48.zip |
feat(warden): publish message
-rw-r--r-- | Cargo.lock | 4 | ||||
-rw-r--r-- | contrib/docker-compose/compose.yaml | 13 | ||||
-rw-r--r-- | crates/warden/Cargo.toml | 6 | ||||
-rw-r--r-- | crates/warden/src/cnfg.rs | 6 | ||||
-rw-r--r-- | crates/warden/src/main.rs | 14 | ||||
-rw-r--r-- | crates/warden/src/server.rs | 1 | ||||
-rw-r--r-- | crates/warden/src/server/publish.rs | 41 | ||||
-rw-r--r-- | crates/warden/src/server/routes/processor/pacs008.rs | 18 | ||||
-rw-r--r-- | crates/warden/src/state.rs | 2 | ||||
-rw-r--r-- | crates/warden/warden.toml | 6 |
10 files changed, 103 insertions, 8 deletions
@@ -3787,11 +3787,14 @@ name = "warden" version = "0.1.0" dependencies = [ "anyhow", + "async-nats", "axum", "clap", "config", "metrics", "metrics-exporter-prometheus", + "opentelemetry", + "opentelemetry-semantic-conventions", "prost 0.14.1", "serde", "serde_json", @@ -3802,6 +3805,7 @@ dependencies = [ "tower", "tower-http", "tracing", + "tracing-opentelemetry", "utoipa", "utoipa-axum", "utoipa-rapidoc", diff --git a/contrib/docker-compose/compose.yaml b/contrib/docker-compose/compose.yaml index 54d09a0..d062365 100644 --- a/contrib/docker-compose/compose.yaml +++ b/contrib/docker-compose/compose.yaml @@ -34,8 +34,21 @@ services: timeout: 3s retries: 5 + nats: + image: nats:2.11-alpine + entrypoint: nats-server + command: ["--js", "-m", "8222"] + volumes: + - nats-data:/data + networks: + - warden + ports: + - "4222:4222" + - "8222:8222" + volumes: db: + nats-data: networks: warden: diff --git a/crates/warden/Cargo.toml b/crates/warden/Cargo.toml index 34da6f3..798dfdc 100644 --- a/crates/warden/Cargo.toml +++ b/crates/warden/Cargo.toml @@ -9,11 +9,14 @@ description.workspace = true [dependencies] anyhow.workspace = true +async-nats.workspace = true axum = { workspace = true, features = ["macros"] } clap = { workspace = true, features = ["derive"] } config = { workspace = true, features = ["convert-case", "toml"] } metrics.workspace = true metrics-exporter-prometheus.workspace = true +opentelemetry.workspace = true +opentelemetry-semantic-conventions.workspace = true prost.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true @@ -36,6 +39,7 @@ tower-http = { workspace = true, features = [ "request-id", ] } tracing.workspace = true +tracing-opentelemetry.workspace = true utoipa = { workspace = true, features = ["axum_extras"] } utoipa-axum.workspace = true utoipa-rapidoc = { workspace = true, optional = true } @@ -57,4 +61,4 @@ tower = { workspace = true, features = ["util"] } [dependencies.warden-stack] workspace = true -features = ["api", "cache", "opentelemetry", "postgres", "tracing-loki"] +features = ["api", "cache", "nats-jetstream", "opentelemetry", "postgres", "tracing-loki"] diff --git a/crates/warden/src/cnfg.rs b/crates/warden/src/cnfg.rs index 53af683..b2a1c60 100644 --- a/crates/warden/src/cnfg.rs +++ b/crates/warden/src/cnfg.rs @@ -6,4 +6,10 @@ pub struct LocalConfig { pub cache_ttl: u64, #[serde(rename = "pseudonyms-endpoint")] pub pseudonyms_endpoint: std::sync::Arc<str>, + pub nats: NatsConfig, +} + +#[derive(Deserialize, Clone)] +pub struct NatsConfig { + pub subject: std::sync::Arc<str>, } diff --git a/crates/warden/src/main.rs b/crates/warden/src/main.rs index f551664..49c171d 100644 --- a/crates/warden/src/main.rs +++ b/crates/warden/src/main.rs @@ -51,6 +51,9 @@ async fn main() -> Result<(), error::AppError> { .cache(&config.cache) .await .inspect_err(|e| error!("cache: {e}"))? + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? .build(); let postgres = services @@ -63,7 +66,16 @@ async fn main() -> Result<(), error::AppError> { .take() .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; - let services = state::Services { postgres, cache }; + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let services = state::Services { + postgres, + cache, + jetstream, + }; let state = AppState::create(services, &config).await?; diff --git a/crates/warden/src/server.rs b/crates/warden/src/server.rs index 032fe95..832c4ac 100644 --- a/crates/warden/src/server.rs +++ b/crates/warden/src/server.rs @@ -1,5 +1,6 @@ pub mod grpc; mod middleware; +mod publish; mod routes; use axum::Router; diff --git a/crates/warden/src/server/publish.rs b/crates/warden/src/server/publish.rs new file mode 100644 index 0000000..b3df0a7 --- /dev/null +++ b/crates/warden/src/server/publish.rs @@ -0,0 +1,41 @@ +use anyhow::Result; +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing::{Instrument, Span, info, info_span, trace}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::message::Payload; +use warden_stack::tracing::telemetry::nats::injector; + +use crate::state::AppHandle; + +pub async fn publish_message(state: &AppHandle, payload: Payload, msg_id: &str) -> Result<()> { + // send transaction to next with nats + let subject = format!("{}.{}", state.app_config.nats.subject, msg_id); + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + trace!(%msg_id, "publishing message"); + + state + .services + .jetstream + .publish_with_headers(subject, headers, payload.into()) + .instrument(span) + .await?; + + info!(%msg_id, "message published"); + + Ok(()) +} diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs index 2478cf0..a8c3e8c 100644 --- a/crates/warden/src/server/routes/processor/pacs008.rs +++ b/crates/warden/src/server/routes/processor/pacs008.rs @@ -1,6 +1,5 @@ use axum::{extract::State, response::IntoResponse}; -use std::sync::Arc; -use tracing::{Instrument, Span, debug, error, info_span, instrument, trace, warn}; +use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace, warn}; use uuid::Uuid; use warden_core::{ google::r#type::Money, @@ -13,7 +12,12 @@ use warden_stack::{ tracing_opentelemetry::OpenTelemetrySpanExt, }; -use crate::{error::AppError, server::routes::PACS008_001_12, state::AppHandle, version::Version}; +use crate::{ + error::AppError, + server::{publish::publish_message, routes::PACS008_001_12}, + state::AppHandle, + version::Version, +}; /// Submit a pacs.008.001.12 transaction #[utoipa::path( @@ -167,7 +171,7 @@ pub(super) async fn post_pacs008( .execute(&state.services.postgres) .instrument(span) .await?; - debug!(%id, %msg_id, "transaction added to history"); + info!(%id, %msg_id, "transaction added to history"); let payload = warden_core::message::Payload { tx_tp: tx_tp.to_string(), @@ -175,10 +179,12 @@ pub(super) async fn post_pacs008( transaction.clone(), )), data_cache: Some(data_cache), - ..Default::default() }; - Ok(String::default()) + publish_message(&state, payload, msg_id).await?; + trace!(%msg_id, "published transaction to stream"); + + Ok((axum::http::StatusCode::CREATED, axum::Json(transaction))) } pub fn build_data_cache(transaction: &Pacs008Document) -> anyhow::Result<DataCache> { diff --git a/crates/warden/src/state.rs b/crates/warden/src/state.rs index eac014a..eebb56b 100644 --- a/crates/warden/src/state.rs +++ b/crates/warden/src/state.rs @@ -1,3 +1,4 @@ +use async_nats::jetstream::Context; use sqlx::PgPool; use std::{ops::Deref, sync::Arc}; use tonic::transport::Endpoint; @@ -26,6 +27,7 @@ impl Deref for AppHandle { pub struct Services { pub postgres: PgPool, pub cache: RedisManager, + pub jetstream: Context, } pub struct AppState { diff --git a/crates/warden/warden.toml b/crates/warden/warden.toml index 00f76b8..a9539c5 100644 --- a/crates/warden/warden.toml +++ b/crates/warden/warden.toml @@ -6,6 +6,9 @@ port = 2210 cache-ttl = 1000 pseudonyms-endpoint = "http://localhost:1610" +[misc.nats] +subject = "iso20022" + [monitoring] log-level = "warden=trace,info" opentelemetry-endpoint = "http://localhost:4317" @@ -19,6 +22,9 @@ host = "localhost" password = "password" user = "postgres" +[nats] +hosts = ["nats://localhost:4222"] + [cache] dsn = "redis://localhost:6379" pooled = true |