aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-10 18:54:42 +0200
committerrtkay123 <dev@kanjala.com>2025-08-10 18:54:42 +0200
commit2b34f0ada4ffac6b38be80e4070c73402b92af48 (patch)
tree3ec0d623748b224c81757165169b910f692a398b
parenta9aef1863ff0fe0422eee403f85644fc76952e80 (diff)
downloadwarden-2b34f0ada4ffac6b38be80e4070c73402b92af48.tar.bz2
warden-2b34f0ada4ffac6b38be80e4070c73402b92af48.zip
feat(warden): publish message
-rw-r--r--Cargo.lock4
-rw-r--r--contrib/docker-compose/compose.yaml13
-rw-r--r--crates/warden/Cargo.toml6
-rw-r--r--crates/warden/src/cnfg.rs6
-rw-r--r--crates/warden/src/main.rs14
-rw-r--r--crates/warden/src/server.rs1
-rw-r--r--crates/warden/src/server/publish.rs41
-rw-r--r--crates/warden/src/server/routes/processor/pacs008.rs18
-rw-r--r--crates/warden/src/state.rs2
-rw-r--r--crates/warden/warden.toml6
10 files changed, 103 insertions, 8 deletions
diff --git a/Cargo.lock b/Cargo.lock
index aca3cf5..15569ce 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3787,11 +3787,14 @@ name = "warden"
version = "0.1.0"
dependencies = [
"anyhow",
+ "async-nats",
"axum",
"clap",
"config",
"metrics",
"metrics-exporter-prometheus",
+ "opentelemetry",
+ "opentelemetry-semantic-conventions",
"prost 0.14.1",
"serde",
"serde_json",
@@ -3802,6 +3805,7 @@ dependencies = [
"tower",
"tower-http",
"tracing",
+ "tracing-opentelemetry",
"utoipa",
"utoipa-axum",
"utoipa-rapidoc",
diff --git a/contrib/docker-compose/compose.yaml b/contrib/docker-compose/compose.yaml
index 54d09a0..d062365 100644
--- a/contrib/docker-compose/compose.yaml
+++ b/contrib/docker-compose/compose.yaml
@@ -34,8 +34,21 @@ services:
timeout: 3s
retries: 5
+ nats:
+ image: nats:2.11-alpine
+ entrypoint: nats-server
+ command: ["--js", "-m", "8222"]
+ volumes:
+ - nats-data:/data
+ networks:
+ - warden
+ ports:
+ - "4222:4222"
+ - "8222:8222"
+
volumes:
db:
+ nats-data:
networks:
warden:
diff --git a/crates/warden/Cargo.toml b/crates/warden/Cargo.toml
index 34da6f3..798dfdc 100644
--- a/crates/warden/Cargo.toml
+++ b/crates/warden/Cargo.toml
@@ -9,11 +9,14 @@ description.workspace = true
[dependencies]
anyhow.workspace = true
+async-nats.workspace = true
axum = { workspace = true, features = ["macros"] }
clap = { workspace = true, features = ["derive"] }
config = { workspace = true, features = ["convert-case", "toml"] }
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
+opentelemetry.workspace = true
+opentelemetry-semantic-conventions.workspace = true
prost.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
@@ -36,6 +39,7 @@ tower-http = { workspace = true, features = [
"request-id",
] }
tracing.workspace = true
+tracing-opentelemetry.workspace = true
utoipa = { workspace = true, features = ["axum_extras"] }
utoipa-axum.workspace = true
utoipa-rapidoc = { workspace = true, optional = true }
@@ -57,4 +61,4 @@ tower = { workspace = true, features = ["util"] }
[dependencies.warden-stack]
workspace = true
-features = ["api", "cache", "opentelemetry", "postgres", "tracing-loki"]
+features = ["api", "cache", "nats-jetstream", "opentelemetry", "postgres", "tracing-loki"]
diff --git a/crates/warden/src/cnfg.rs b/crates/warden/src/cnfg.rs
index 53af683..b2a1c60 100644
--- a/crates/warden/src/cnfg.rs
+++ b/crates/warden/src/cnfg.rs
@@ -6,4 +6,10 @@ pub struct LocalConfig {
pub cache_ttl: u64,
#[serde(rename = "pseudonyms-endpoint")]
pub pseudonyms_endpoint: std::sync::Arc<str>,
+ pub nats: NatsConfig,
+}
+
+#[derive(Deserialize, Clone)]
+pub struct NatsConfig {
+ pub subject: std::sync::Arc<str>,
}
diff --git a/crates/warden/src/main.rs b/crates/warden/src/main.rs
index f551664..49c171d 100644
--- a/crates/warden/src/main.rs
+++ b/crates/warden/src/main.rs
@@ -51,6 +51,9 @@ async fn main() -> Result<(), error::AppError> {
.cache(&config.cache)
.await
.inspect_err(|e| error!("cache: {e}"))?
+ .nats_jetstream(&config.nats)
+ .await
+ .inspect_err(|e| error!("nats: {e}"))?
.build();
let postgres = services
@@ -63,7 +66,16 @@ async fn main() -> Result<(), error::AppError> {
.take()
.ok_or_else(|| anyhow::anyhow!("cache is not ready"))?;
- let services = state::Services { postgres, cache };
+ let jetstream = services
+ .jetstream
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?;
+
+ let services = state::Services {
+ postgres,
+ cache,
+ jetstream,
+ };
let state = AppState::create(services, &config).await?;
diff --git a/crates/warden/src/server.rs b/crates/warden/src/server.rs
index 032fe95..832c4ac 100644
--- a/crates/warden/src/server.rs
+++ b/crates/warden/src/server.rs
@@ -1,5 +1,6 @@
pub mod grpc;
mod middleware;
+mod publish;
mod routes;
use axum::Router;
diff --git a/crates/warden/src/server/publish.rs b/crates/warden/src/server/publish.rs
new file mode 100644
index 0000000..b3df0a7
--- /dev/null
+++ b/crates/warden/src/server/publish.rs
@@ -0,0 +1,41 @@
+use anyhow::Result;
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing::{Instrument, Span, info, info_span, trace};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::message::Payload;
+use warden_stack::tracing::telemetry::nats::injector;
+
+use crate::state::AppHandle;
+
+pub async fn publish_message(state: &AppHandle, payload: Payload, msg_id: &str) -> Result<()> {
+ // send transaction to next with nats
+ let subject = format!("{}.{}", state.app_config.nats.subject, msg_id);
+ let payload = prost::Message::encode_to_vec(&payload);
+
+ let mut headers = async_nats::HeaderMap::new();
+
+ let cx = Span::current().context();
+
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers))
+ });
+
+ let span = info_span!("nats.publish");
+ span.set_attribute(
+ attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
+ subject.to_string(),
+ );
+ trace!(%msg_id, "publishing message");
+
+ state
+ .services
+ .jetstream
+ .publish_with_headers(subject, headers, payload.into())
+ .instrument(span)
+ .await?;
+
+ info!(%msg_id, "message published");
+
+ Ok(())
+}
diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs
index 2478cf0..a8c3e8c 100644
--- a/crates/warden/src/server/routes/processor/pacs008.rs
+++ b/crates/warden/src/server/routes/processor/pacs008.rs
@@ -1,6 +1,5 @@
use axum::{extract::State, response::IntoResponse};
-use std::sync::Arc;
-use tracing::{Instrument, Span, debug, error, info_span, instrument, trace, warn};
+use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace, warn};
use uuid::Uuid;
use warden_core::{
google::r#type::Money,
@@ -13,7 +12,12 @@ use warden_stack::{
tracing_opentelemetry::OpenTelemetrySpanExt,
};
-use crate::{error::AppError, server::routes::PACS008_001_12, state::AppHandle, version::Version};
+use crate::{
+ error::AppError,
+ server::{publish::publish_message, routes::PACS008_001_12},
+ state::AppHandle,
+ version::Version,
+};
/// Submit a pacs.008.001.12 transaction
#[utoipa::path(
@@ -167,7 +171,7 @@ pub(super) async fn post_pacs008(
.execute(&state.services.postgres)
.instrument(span)
.await?;
- debug!(%id, %msg_id, "transaction added to history");
+ info!(%id, %msg_id, "transaction added to history");
let payload = warden_core::message::Payload {
tx_tp: tx_tp.to_string(),
@@ -175,10 +179,12 @@ pub(super) async fn post_pacs008(
transaction.clone(),
)),
data_cache: Some(data_cache),
- ..Default::default()
};
- Ok(String::default())
+ publish_message(&state, payload, msg_id).await?;
+ trace!(%msg_id, "published transaction to stream");
+
+ Ok((axum::http::StatusCode::CREATED, axum::Json(transaction)))
}
pub fn build_data_cache(transaction: &Pacs008Document) -> anyhow::Result<DataCache> {
diff --git a/crates/warden/src/state.rs b/crates/warden/src/state.rs
index eac014a..eebb56b 100644
--- a/crates/warden/src/state.rs
+++ b/crates/warden/src/state.rs
@@ -1,3 +1,4 @@
+use async_nats::jetstream::Context;
use sqlx::PgPool;
use std::{ops::Deref, sync::Arc};
use tonic::transport::Endpoint;
@@ -26,6 +27,7 @@ impl Deref for AppHandle {
pub struct Services {
pub postgres: PgPool,
pub cache: RedisManager,
+ pub jetstream: Context,
}
pub struct AppState {
diff --git a/crates/warden/warden.toml b/crates/warden/warden.toml
index 00f76b8..a9539c5 100644
--- a/crates/warden/warden.toml
+++ b/crates/warden/warden.toml
@@ -6,6 +6,9 @@ port = 2210
cache-ttl = 1000
pseudonyms-endpoint = "http://localhost:1610"
+[misc.nats]
+subject = "iso20022"
+
[monitoring]
log-level = "warden=trace,info"
opentelemetry-endpoint = "http://localhost:4317"
@@ -19,6 +22,9 @@ host = "localhost"
password = "password"
user = "postgres"
+[nats]
+hosts = ["nats://localhost:4222"]
+
[cache]
dsn = "redis://localhost:6379"
pooled = true