aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-10 22:13:40 +0200
committerrtkay123 <dev@kanjala.com>2025-08-10 22:13:40 +0200
commit1479133bc9fc39159f7b37dba1fd966cd0124574 (patch)
treef437e1c654b3fb6bb6e131574e1d2aea00431d28
parentcbfe5f42f5d17faca06d3583cb61c652a7184590 (diff)
downloadwarden-1479133bc9fc39159f7b37dba1fd966cd0124574.tar.bz2
warden-1479133bc9fc39159f7b37dba1fd966cd0124574.zip
feat(warden): save clearance
-rw-r--r--.sqlx/query-37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52.json15
-rw-r--r--.sqlx/query-dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091.json28
-rw-r--r--Cargo.lock1
-rw-r--r--crates/warden/Cargo.toml2
-rw-r--r--crates/warden/src/server/routes.rs16
-rw-r--r--crates/warden/src/server/routes/processor.rs2
-rw-r--r--crates/warden/src/server/routes/processor/pacs002.rs233
-rw-r--r--crates/warden/src/server/routes/processor/pacs008.rs8
8 files changed, 292 insertions, 13 deletions
diff --git a/.sqlx/query-37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52.json b/.sqlx/query-37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52.json
new file mode 100644
index 0000000..6301b0f
--- /dev/null
+++ b/.sqlx/query-37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52.json
@@ -0,0 +1,15 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "insert into pacs002 (id, document) values ($1, $2)",
+ "describe": {
+ "columns": [],
+ "parameters": {
+ "Left": [
+ "Uuid",
+ "Jsonb"
+ ]
+ },
+ "nullable": []
+ },
+ "hash": "37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52"
+}
diff --git a/.sqlx/query-dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091.json b/.sqlx/query-dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091.json
new file mode 100644
index 0000000..29d2a8c
--- /dev/null
+++ b/.sqlx/query-dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091.json
@@ -0,0 +1,28 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "select id, document as \"document: sqlx::types::Json<serde_json::Value>\" from pacs008 where exists (\n select 1\n from jsonb_array_elements(document->'f_i_to_f_i_cstmr_cdt_trf'->'cdt_trf_tx_inf') as elem\n where elem->'pmt_id'->>'end_to_end_id' = $1\n ) limit 1",
+ "describe": {
+ "columns": [
+ {
+ "ordinal": 0,
+ "name": "id",
+ "type_info": "Uuid"
+ },
+ {
+ "ordinal": 1,
+ "name": "document: sqlx::types::Json<serde_json::Value>",
+ "type_info": "Jsonb"
+ }
+ ],
+ "parameters": {
+ "Left": [
+ "Text"
+ ]
+ },
+ "nullable": [
+ false,
+ false
+ ]
+ },
+ "hash": "dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091"
+}
diff --git a/Cargo.lock b/Cargo.lock
index 15569ce..c306dc6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3742,6 +3742,7 @@ checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d"
dependencies = [
"getrandom 0.3.3",
"js-sys",
+ "serde",
"wasm-bindgen",
]
diff --git a/crates/warden/Cargo.toml b/crates/warden/Cargo.toml
index 7f4a4d6..5a40ee5 100644
--- a/crates/warden/Cargo.toml
+++ b/crates/warden/Cargo.toml
@@ -46,7 +46,7 @@ utoipa-rapidoc = { workspace = true, optional = true }
utoipa-redoc = { workspace = true, optional = true }
utoipa-scalar = { workspace = true, optional = true }
utoipa-swagger-ui = { workspace = true, optional = true }
-uuid = { workspace = true, features = ["v7"] }
+uuid = { workspace = true, features = ["v7", "serde"] }
warden-core = { workspace = true, features = ["message", "pseudonyms", "serde", "openapi"] }
[features]
diff --git a/crates/warden/src/server/routes.rs b/crates/warden/src/server/routes.rs
index c03a972..d3b05c6 100644
--- a/crates/warden/src/server/routes.rs
+++ b/crates/warden/src/server/routes.rs
@@ -21,16 +21,24 @@ mod tests {
body::Body,
http::{Request, StatusCode},
};
+ use sqlx::PgPool;
use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
use crate::{
server::{self, test_config},
- state::AppState,
+ state::{AppState, Services},
};
- #[tokio::test]
- async fn health_check() {
- let state = AppState::create(&test_config()).await.unwrap();
+ #[sqlx::test]
+ async fn health_check(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(Services {postgres: pool,cache, jetstream}, &test_config()).await.unwrap();
let app = server::router(state);
let response = app
diff --git a/crates/warden/src/server/routes/processor.rs b/crates/warden/src/server/routes/processor.rs
index d1f062a..ee865d3 100644
--- a/crates/warden/src/server/routes/processor.rs
+++ b/crates/warden/src/server/routes/processor.rs
@@ -1,3 +1,4 @@
+mod pacs002;
mod pacs008;
use utoipa_axum::{router::OpenApiRouter, routes};
@@ -7,5 +8,6 @@ use crate::state::AppHandle;
pub fn router(store: AppHandle) -> OpenApiRouter {
OpenApiRouter::new()
.routes(routes!(pacs008::post_pacs008))
+ .routes(routes!(pacs002::post_pacs002))
.with_state(store)
}
diff --git a/crates/warden/src/server/routes/processor/pacs002.rs b/crates/warden/src/server/routes/processor/pacs002.rs
new file mode 100644
index 0000000..7a15d7a
--- /dev/null
+++ b/crates/warden/src/server/routes/processor/pacs002.rs
@@ -0,0 +1,233 @@
+use axum::{extract::State, response::IntoResponse};
+use opentelemetry_semantic_conventions::attribute;
+use prost::Message as _;
+use serde::Serialize;
+use tracing::{Instrument, debug, error, info, info_span, trace};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use uuid::Uuid;
+use warden_core::{
+ google::r#type::Money,
+ iso20022::{TransactionType, pacs002::Pacs002Document},
+ message::{DataCache, Payload},
+ pseudonyms::transaction_relationship::{CreatePseudonymRequest, TransactionRelationship},
+};
+use warden_stack::redis::AsyncCommands;
+
+use crate::{error::AppError, server::{publish::publish_message, routes::{processor::pacs008::{build_data_cache, set_cache}, PACS002_001_12}}, state::AppHandle, version::Version};
+
+#[derive(Serialize)]
+struct Row {
+ id: Uuid,
+ document: sqlx::types::Json<serde_json::Value>,
+}
+
+/// Submit a pacs.002.001.12 transaction
+#[utoipa::path(
+ post,
+ responses((
+ status = CREATED,
+ body = Pacs002Document
+ )),
+ operation_id = "post_pacs_002", // https://github.com/juhaku/utoipa/issues/1170
+ path = "/{version}/pacs002",
+ params(
+ ("version" = Version, Path, description = "API version, e.g., v1, v2, v3")
+ ),
+ tag = PACS002_001_12,
+ request_body(
+ content = Pacs002Document
+ ))
+]
+#[tracing::instrument(
+ skip(state, request),
+ err(Debug),
+ fields(method = "POST", end_to_end_id, msg_id, tx_tp)
+)]
+pub async fn post_pacs002(
+ State(state): State<AppHandle>,
+ axum::Json(request): axum::Json<Pacs002Document>,
+) -> Result<impl IntoResponse, AppError> {
+ let tx_tp = TransactionType::PACS002.to_string();
+ tracing::Span::current().record("tx_tp", &tx_tp);
+
+ let cre_dt_tm = request.f_i_to_f_i_pmt_sts_rpt.grp_hdr.cre_dt_tm;
+ let end_to_end_id = request.f_i_to_f_i_pmt_sts_rpt.tx_inf_and_sts[0]
+ .orgnl_end_to_end_id
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("end_to_end_id is expected"))?;
+ tracing::Span::current().record("end_to_end_id", end_to_end_id);
+
+ let msg_id = &request.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id;
+ tracing::Span::current().record("msg_id", msg_id);
+
+ let pmt_inf_id = &request.f_i_to_f_i_pmt_sts_rpt.tx_inf_and_sts[0].orgnl_instr_id;
+ let tx_sts = &request.f_i_to_f_i_pmt_sts_rpt.tx_inf_and_sts[0].tx_sts;
+
+ let mut cache = state.services.cache.get().await?;
+ trace!(end_to_end_id = end_to_end_id, "getting data cache");
+ let cache = cache
+ .get::<_, Vec<u8>>(&end_to_end_id)
+ .await
+ .map(|value| DataCache::decode(value.as_ref()));
+
+ let data_cache = match cache {
+ Ok(Ok(data_cache)) => {
+ debug!(end_to_end_id = end_to_end_id, "cache hit");
+ rebuild_entities(
+ end_to_end_id,
+ &state,
+ Some(data_cache),
+ )
+ .await?
+ }
+ _ => {
+ debug!(end_to_end_id = end_to_end_id, "cache miss");
+ rebuild_entities(end_to_end_id, &state, None).await?
+ }
+ };
+
+ let amount = data_cache.instd_amt.as_ref().map(|value| value.value);
+
+ let ccy = data_cache
+ .instd_amt
+ .as_ref()
+ .map(|value| value.ccy.as_str());
+
+ debug!(%msg_id, %end_to_end_id, "parsed transaction identifiers");
+
+ let money = if let (Some(amt), Some(ccy)) = (amount, ccy) {
+ Some(Money::try_from((amt, ccy)).map_err(|_e| anyhow::anyhow!("invalid currency"))?)
+ } else {
+ trace!(msg_id, "transaction has no amount or currency");
+ None
+ };
+
+ let transaction_relationship = TransactionRelationship {
+ from: data_cache.cdtr_acct_id.to_string(),
+ to: data_cache.dbtr_acct_id.to_string(),
+ amt: money,
+ cre_dt_tm: Some(cre_dt_tm),
+ end_to_end_id: end_to_end_id.to_string(),
+ msg_id: msg_id.to_string(),
+ pmt_inf_id: pmt_inf_id
+ .as_ref()
+ .ok_or_else(|| {
+ error!("missing pmt_inf_id");
+ anyhow::anyhow!("missing pmt_inf_id")
+ })?
+ .to_string(),
+ tx_tp: tx_tp.to_string(),
+ tx_sts: tx_sts.clone(),
+ ..Default::default()
+ };
+
+ debug!(%msg_id, %end_to_end_id, "constructed transaction relationship");
+
+ // TODO: remove debtor_account_id from create request, use from TR
+ trace!("updating pseudonyms");
+
+ let pseudonyms_request = CreatePseudonymRequest {
+ transaction_relationship: Some(transaction_relationship),
+ debtor_id: data_cache.dbtr_id.to_string(),
+ debtor_account_id: data_cache.dbtr_acct_id.to_string(),
+ creditor_id: data_cache.cdtr_id.to_string(),
+ creditor_account_id: data_cache.cdtr_acct_id.to_string(),
+ };
+
+ let mut pseudonyms_client = state.mutate_pseudonym_client.clone();
+
+ let pseudonyms_fut = async {
+ debug!("creating pseudonyms");
+ let span = info_span!("create.pseudonyms.account");
+ span.set_attribute(attribute::RPC_SERVICE, "pseudonyms");
+ pseudonyms_client
+ .create_pseudonym(pseudonyms_request)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!(error = %e, "failed to create pseudonyms");
+ anyhow::anyhow!("could not create pseudonyms")
+ })
+ };
+
+ let id = Uuid::now_v7();
+
+ let tr_fut = async {
+ let span = info_span!("create.transaction_history.pacs002");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "pacs002");
+
+ sqlx::query!(
+ "insert into pacs002 (id, document) values ($1, $2)",
+ id,
+ sqlx::types::Json(&request) as _
+ )
+ .execute(&state.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ anyhow::anyhow!("could not insert transaction_history")
+ })
+ };
+ let (_result, _resp) = tokio::try_join!(tr_fut, pseudonyms_fut)?;
+ debug!(%id, %msg_id, %tx_tp, "transaction added to history");
+
+ trace!(%msg_id, "publishing payload to ");
+
+ let payload = Payload {
+ tx_tp: tx_tp.to_string(),
+ data_cache: Some(data_cache),
+ transaction: Some(warden_core::message::payload::Transaction::Pacs002(
+ request.clone(),
+ )),
+ ..Default::default()
+ };
+
+ publish_message(&state, payload, msg_id).await?;
+ info!(%msg_id, "published transaction to router");
+ Ok((axum::http::StatusCode::CREATED, axum::Json(request)))
+}
+
+#[tracing::instrument(skip(state, data_cache))]
+async fn rebuild_entities(
+ end_to_end_id: &str,
+ state: &AppHandle,
+ mut data_cache: Option<DataCache>,
+) -> anyhow::Result<DataCache> {
+ let span = info_span!("get.transaction_history.pacs008");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "select");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "pacs008");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, end_to_end_id.to_string());
+ tracing::info!(end_to_end = end_to_end_id, "rebuilding cache");
+
+ let transaction = sqlx::query_as!(
+ Row,
+ r#"select id, document as "document: sqlx::types::Json<serde_json::Value>" from pacs008 where exists (
+ select 1
+ from jsonb_array_elements(document->'f_i_to_f_i_cstmr_cdt_trf'->'cdt_trf_tx_inf') as elem
+ where elem->'pmt_id'->>'end_to_end_id' = $1
+ ) limit 1"#,
+ end_to_end_id
+ )
+ .fetch_one(&state.services.postgres)
+ .instrument(span)
+ .await?;
+
+ debug!(id = ?transaction.id, "found transaction");
+
+ let document = serde_json::from_value(transaction.document.0)?;
+
+ if data_cache.is_none() {
+ debug!(e2e_id = end_to_end_id, "attempting to rebuild data cache");
+ let data_cache_value = build_data_cache(&document)?;
+
+ set_cache(end_to_end_id, state, &data_cache_value).await?;
+
+ let _old_value = data_cache.replace(data_cache_value);
+ };
+
+ data_cache.ok_or_else(|| anyhow::anyhow!("no pacs008 found"))
+}
diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs
index a8c3e8c..64a1029 100644
--- a/crates/warden/src/server/routes/processor/pacs008.rs
+++ b/crates/warden/src/server/routes/processor/pacs008.rs
@@ -74,14 +74,6 @@ pub(super) async fn post_pacs008(
let end_to_end_id = cdt_trf_tx_inf
.as_ref()
.map(|value| value.pmt_id.end_to_end_id.as_str())
- .ok_or_else(|| anyhow::anyhow!("missing end_to_end_id id"))?;
-
- let ccy =
- cdt_trf_tx_inf.and_then(|value| value.instd_amt.as_ref().map(|value| value.ccy.as_str()));
-
- let end_to_end_id = cdt_trf_tx_inf
- .as_ref()
- .map(|value| value.pmt_id.end_to_end_id.as_str())
.ok_or_else(|| {
error!("missing end_to_end_id");
anyhow::anyhow!("missing end_to_end_id id")