diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-10 22:13:40 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-10 22:13:40 +0200 |
commit | 1479133bc9fc39159f7b37dba1fd966cd0124574 (patch) | |
tree | f437e1c654b3fb6bb6e131574e1d2aea00431d28 | |
parent | cbfe5f42f5d17faca06d3583cb61c652a7184590 (diff) | |
download | warden-1479133bc9fc39159f7b37dba1fd966cd0124574.tar.bz2 warden-1479133bc9fc39159f7b37dba1fd966cd0124574.zip |
feat(warden): save clearance
-rw-r--r-- | .sqlx/query-37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52.json | 15 | ||||
-rw-r--r-- | .sqlx/query-dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091.json | 28 | ||||
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | crates/warden/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/warden/src/server/routes.rs | 16 | ||||
-rw-r--r-- | crates/warden/src/server/routes/processor.rs | 2 | ||||
-rw-r--r-- | crates/warden/src/server/routes/processor/pacs002.rs | 233 | ||||
-rw-r--r-- | crates/warden/src/server/routes/processor/pacs008.rs | 8 |
8 files changed, 292 insertions, 13 deletions
diff --git a/.sqlx/query-37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52.json b/.sqlx/query-37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52.json new file mode 100644 index 0000000..6301b0f --- /dev/null +++ b/.sqlx/query-37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "insert into pacs002 (id, document) values ($1, $2)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Jsonb" + ] + }, + "nullable": [] + }, + "hash": "37a548705f84beb612bb984fcaecb39d2b5560eb7cbd0cc9d496c819b88d8c52" +} diff --git a/.sqlx/query-dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091.json b/.sqlx/query-dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091.json new file mode 100644 index 0000000..29d2a8c --- /dev/null +++ b/.sqlx/query-dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "select id, document as \"document: sqlx::types::Json<serde_json::Value>\" from pacs008 where exists (\n select 1\n from jsonb_array_elements(document->'f_i_to_f_i_cstmr_cdt_trf'->'cdt_trf_tx_inf') as elem\n where elem->'pmt_id'->>'end_to_end_id' = $1\n ) limit 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "document: sqlx::types::Json<serde_json::Value>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "dee04f022231e4b4b5b4a9941acf891583af2d241615045d788c7a54093f6091" +} @@ -3742,6 +3742,7 @@ checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" dependencies = [ "getrandom 0.3.3", "js-sys", + "serde", "wasm-bindgen", ] diff --git a/crates/warden/Cargo.toml b/crates/warden/Cargo.toml index 7f4a4d6..5a40ee5 100644 --- a/crates/warden/Cargo.toml +++ b/crates/warden/Cargo.toml @@ -46,7 +46,7 @@ utoipa-rapidoc = { workspace = true, optional = true } utoipa-redoc = { workspace = true, optional = true } utoipa-scalar = { workspace = true, optional = true } utoipa-swagger-ui = { workspace = true, optional = true } -uuid = { workspace = true, features = ["v7"] } +uuid = { workspace = true, features = ["v7", "serde"] } warden-core = { workspace = true, features = ["message", "pseudonyms", "serde", "openapi"] } [features] diff --git a/crates/warden/src/server/routes.rs b/crates/warden/src/server/routes.rs index c03a972..d3b05c6 100644 --- a/crates/warden/src/server/routes.rs +++ b/crates/warden/src/server/routes.rs @@ -21,16 +21,24 @@ mod tests { body::Body, http::{Request, StatusCode}, }; + use sqlx::PgPool; use tower::ServiceExt; + use warden_stack::cache::RedisManager; use crate::{ server::{self, test_config}, - state::AppState, + state::{AppState, Services}, }; - #[tokio::test] - async fn health_check() { - let state = AppState::create(&test_config()).await.unwrap(); + #[sqlx::test] + async fn health_check(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create(Services {postgres: pool,cache, jetstream}, &test_config()).await.unwrap(); let app = server::router(state); let response = app diff --git a/crates/warden/src/server/routes/processor.rs b/crates/warden/src/server/routes/processor.rs index d1f062a..ee865d3 100644 --- a/crates/warden/src/server/routes/processor.rs +++ b/crates/warden/src/server/routes/processor.rs @@ -1,3 +1,4 @@ +mod pacs002; mod pacs008; use utoipa_axum::{router::OpenApiRouter, routes}; @@ -7,5 +8,6 @@ use crate::state::AppHandle; pub fn router(store: AppHandle) -> OpenApiRouter { OpenApiRouter::new() .routes(routes!(pacs008::post_pacs008)) + .routes(routes!(pacs002::post_pacs002)) .with_state(store) } diff --git a/crates/warden/src/server/routes/processor/pacs002.rs b/crates/warden/src/server/routes/processor/pacs002.rs new file mode 100644 index 0000000..7a15d7a --- /dev/null +++ b/crates/warden/src/server/routes/processor/pacs002.rs @@ -0,0 +1,233 @@ +use axum::{extract::State, response::IntoResponse}; +use opentelemetry_semantic_conventions::attribute; +use prost::Message as _; +use serde::Serialize; +use tracing::{Instrument, debug, error, info, info_span, trace}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +use warden_core::{ + google::r#type::Money, + iso20022::{TransactionType, pacs002::Pacs002Document}, + message::{DataCache, Payload}, + pseudonyms::transaction_relationship::{CreatePseudonymRequest, TransactionRelationship}, +}; +use warden_stack::redis::AsyncCommands; + +use crate::{error::AppError, server::{publish::publish_message, routes::{processor::pacs008::{build_data_cache, set_cache}, PACS002_001_12}}, state::AppHandle, version::Version}; + +#[derive(Serialize)] +struct Row { + id: Uuid, + document: sqlx::types::Json<serde_json::Value>, +} + +/// Submit a pacs.002.001.12 transaction +#[utoipa::path( + post, + responses(( + status = CREATED, + body = Pacs002Document + )), + operation_id = "post_pacs_002", // https://github.com/juhaku/utoipa/issues/1170 + path = "/{version}/pacs002", + params( + ("version" = Version, Path, description = "API version, e.g., v1, v2, v3") + ), + tag = PACS002_001_12, + request_body( + content = Pacs002Document + )) +] +#[tracing::instrument( + skip(state, request), + err(Debug), + fields(method = "POST", end_to_end_id, msg_id, tx_tp) +)] +pub async fn post_pacs002( + State(state): State<AppHandle>, + axum::Json(request): axum::Json<Pacs002Document>, +) -> Result<impl IntoResponse, AppError> { + let tx_tp = TransactionType::PACS002.to_string(); + tracing::Span::current().record("tx_tp", &tx_tp); + + let cre_dt_tm = request.f_i_to_f_i_pmt_sts_rpt.grp_hdr.cre_dt_tm; + let end_to_end_id = request.f_i_to_f_i_pmt_sts_rpt.tx_inf_and_sts[0] + .orgnl_end_to_end_id + .as_ref() + .ok_or_else(|| anyhow::anyhow!("end_to_end_id is expected"))?; + tracing::Span::current().record("end_to_end_id", end_to_end_id); + + let msg_id = &request.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id; + tracing::Span::current().record("msg_id", msg_id); + + let pmt_inf_id = &request.f_i_to_f_i_pmt_sts_rpt.tx_inf_and_sts[0].orgnl_instr_id; + let tx_sts = &request.f_i_to_f_i_pmt_sts_rpt.tx_inf_and_sts[0].tx_sts; + + let mut cache = state.services.cache.get().await?; + trace!(end_to_end_id = end_to_end_id, "getting data cache"); + let cache = cache + .get::<_, Vec<u8>>(&end_to_end_id) + .await + .map(|value| DataCache::decode(value.as_ref())); + + let data_cache = match cache { + Ok(Ok(data_cache)) => { + debug!(end_to_end_id = end_to_end_id, "cache hit"); + rebuild_entities( + end_to_end_id, + &state, + Some(data_cache), + ) + .await? + } + _ => { + debug!(end_to_end_id = end_to_end_id, "cache miss"); + rebuild_entities(end_to_end_id, &state, None).await? + } + }; + + let amount = data_cache.instd_amt.as_ref().map(|value| value.value); + + let ccy = data_cache + .instd_amt + .as_ref() + .map(|value| value.ccy.as_str()); + + debug!(%msg_id, %end_to_end_id, "parsed transaction identifiers"); + + let money = if let (Some(amt), Some(ccy)) = (amount, ccy) { + Some(Money::try_from((amt, ccy)).map_err(|_e| anyhow::anyhow!("invalid currency"))?) + } else { + trace!(msg_id, "transaction has no amount or currency"); + None + }; + + let transaction_relationship = TransactionRelationship { + from: data_cache.cdtr_acct_id.to_string(), + to: data_cache.dbtr_acct_id.to_string(), + amt: money, + cre_dt_tm: Some(cre_dt_tm), + end_to_end_id: end_to_end_id.to_string(), + msg_id: msg_id.to_string(), + pmt_inf_id: pmt_inf_id + .as_ref() + .ok_or_else(|| { + error!("missing pmt_inf_id"); + anyhow::anyhow!("missing pmt_inf_id") + })? + .to_string(), + tx_tp: tx_tp.to_string(), + tx_sts: tx_sts.clone(), + ..Default::default() + }; + + debug!(%msg_id, %end_to_end_id, "constructed transaction relationship"); + + // TODO: remove debtor_account_id from create request, use from TR + trace!("updating pseudonyms"); + + let pseudonyms_request = CreatePseudonymRequest { + transaction_relationship: Some(transaction_relationship), + debtor_id: data_cache.dbtr_id.to_string(), + debtor_account_id: data_cache.dbtr_acct_id.to_string(), + creditor_id: data_cache.cdtr_id.to_string(), + creditor_account_id: data_cache.cdtr_acct_id.to_string(), + }; + + let mut pseudonyms_client = state.mutate_pseudonym_client.clone(); + + let pseudonyms_fut = async { + debug!("creating pseudonyms"); + let span = info_span!("create.pseudonyms.account"); + span.set_attribute(attribute::RPC_SERVICE, "pseudonyms"); + pseudonyms_client + .create_pseudonym(pseudonyms_request) + .instrument(span) + .await + .map_err(|e| { + error!(error = %e, "failed to create pseudonyms"); + anyhow::anyhow!("could not create pseudonyms") + }) + }; + + let id = Uuid::now_v7(); + + let tr_fut = async { + let span = info_span!("create.transaction_history.pacs002"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "pacs002"); + + sqlx::query!( + "insert into pacs002 (id, document) values ($1, $2)", + id, + sqlx::types::Json(&request) as _ + ) + .execute(&state.services.postgres) + .instrument(span) + .await + .map_err(|e| { + error!("{e}"); + anyhow::anyhow!("could not insert transaction_history") + }) + }; + let (_result, _resp) = tokio::try_join!(tr_fut, pseudonyms_fut)?; + debug!(%id, %msg_id, %tx_tp, "transaction added to history"); + + trace!(%msg_id, "publishing payload to "); + + let payload = Payload { + tx_tp: tx_tp.to_string(), + data_cache: Some(data_cache), + transaction: Some(warden_core::message::payload::Transaction::Pacs002( + request.clone(), + )), + ..Default::default() + }; + + publish_message(&state, payload, msg_id).await?; + info!(%msg_id, "published transaction to router"); + Ok((axum::http::StatusCode::CREATED, axum::Json(request))) +} + +#[tracing::instrument(skip(state, data_cache))] +async fn rebuild_entities( + end_to_end_id: &str, + state: &AppHandle, + mut data_cache: Option<DataCache>, +) -> anyhow::Result<DataCache> { + let span = info_span!("get.transaction_history.pacs008"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "select"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "pacs008"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, end_to_end_id.to_string()); + tracing::info!(end_to_end = end_to_end_id, "rebuilding cache"); + + let transaction = sqlx::query_as!( + Row, + r#"select id, document as "document: sqlx::types::Json<serde_json::Value>" from pacs008 where exists ( + select 1 + from jsonb_array_elements(document->'f_i_to_f_i_cstmr_cdt_trf'->'cdt_trf_tx_inf') as elem + where elem->'pmt_id'->>'end_to_end_id' = $1 + ) limit 1"#, + end_to_end_id + ) + .fetch_one(&state.services.postgres) + .instrument(span) + .await?; + + debug!(id = ?transaction.id, "found transaction"); + + let document = serde_json::from_value(transaction.document.0)?; + + if data_cache.is_none() { + debug!(e2e_id = end_to_end_id, "attempting to rebuild data cache"); + let data_cache_value = build_data_cache(&document)?; + + set_cache(end_to_end_id, state, &data_cache_value).await?; + + let _old_value = data_cache.replace(data_cache_value); + }; + + data_cache.ok_or_else(|| anyhow::anyhow!("no pacs008 found")) +} diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs index a8c3e8c..64a1029 100644 --- a/crates/warden/src/server/routes/processor/pacs008.rs +++ b/crates/warden/src/server/routes/processor/pacs008.rs @@ -74,14 +74,6 @@ pub(super) async fn post_pacs008( let end_to_end_id = cdt_trf_tx_inf .as_ref() .map(|value| value.pmt_id.end_to_end_id.as_str()) - .ok_or_else(|| anyhow::anyhow!("missing end_to_end_id id"))?; - - let ccy = - cdt_trf_tx_inf.and_then(|value| value.instd_amt.as_ref().map(|value| value.ccy.as_str())); - - let end_to_end_id = cdt_trf_tx_inf - .as_ref() - .map(|value| value.pmt_id.end_to_end_id.as_str()) .ok_or_else(|| { error!("missing end_to_end_id"); anyhow::anyhow!("missing end_to_end_id id") |