diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-10 18:07:13 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-10 18:07:13 +0200 |
commit | a9aef1863ff0fe0422eee403f85644fc76952e80 (patch) | |
tree | a233ca1ca7fb38d06592fa7296382b5e9b478d62 | |
parent | 8b4f27d2c39d1e1f5f1cc455c58800e806ee98d0 (diff) | |
download | warden-a9aef1863ff0fe0422eee403f85644fc76952e80.tar.bz2 warden-a9aef1863ff0fe0422eee403f85644fc76952e80.zip |
feat(warden): save history
-rw-r--r-- | Cargo.lock | 8 | ||||
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | crates/pseudonyms/Cargo.toml | 5 | ||||
-rw-r--r-- | crates/pseudonyms/src/main.rs | 9 | ||||
-rw-r--r-- | crates/pseudonyms/src/server/interceptor.rs | 4 | ||||
-rw-r--r-- | crates/pseudonyms/src/state.rs | 6 | ||||
-rw-r--r-- | crates/pseudonyms/src/state/mutate.rs | 6 | ||||
-rw-r--r-- | crates/pseudonyms/tests/helpers.rs | 4 | ||||
-rw-r--r-- | crates/warden/Cargo.toml | 15 | ||||
-rw-r--r-- | crates/warden/migrations/20250810160034_transaction_history.sql | 38 | ||||
-rw-r--r-- | crates/warden/src/cnfg.rs | 2 | ||||
-rw-r--r-- | crates/warden/src/main.rs | 27 | ||||
-rw-r--r-- | crates/warden/src/server.rs | 1 | ||||
-rw-r--r-- | crates/warden/src/server/grpc.rs | 28 | ||||
-rw-r--r-- | crates/warden/src/server/routes/processor/pacs008.rs | 92 | ||||
-rw-r--r-- | crates/warden/src/state.rs | 36 | ||||
-rw-r--r-- | crates/warden/warden.toml | 23 | ||||
-rw-r--r-- | lib/warden-core/build.rs | 6 | ||||
-rw-r--r-- | lib/warden-core/src/lib.rs | 3 |
19 files changed, 280 insertions, 39 deletions
@@ -2838,6 +2838,7 @@ dependencies = [ "tokio-stream", "tracing", "url", + "uuid", "webpki-roots 0.26.11", ] @@ -2919,6 +2920,7 @@ dependencies = [ "thiserror 2.0.12", "time", "tracing", + "uuid", "whoami", ] @@ -2957,6 +2959,7 @@ dependencies = [ "thiserror 2.0.12", "time", "tracing", + "uuid", "whoami", ] @@ -2983,6 +2986,7 @@ dependencies = [ "time", "tracing", "url", + "uuid", ] [[package]] @@ -3788,10 +3792,13 @@ dependencies = [ "config", "metrics", "metrics-exporter-prometheus", + "prost 0.14.1", "serde", "serde_json", + "sqlx", "time", "tokio", + "tonic 0.14.0", "tower", "tower-http", "tracing", @@ -3801,6 +3808,7 @@ dependencies = [ "utoipa-redoc", "utoipa-scalar", "utoipa-swagger-ui", + "uuid", "warden-core", "warden-stack", ] @@ -43,5 +43,11 @@ utoipa-rapidoc = "6.0.0" utoipa-redoc = "6.0.0" utoipa-scalar = "0.3.0" utoipa-swagger-ui = "9.0.2" +uuid = "1.17.0" warden-core = { path = "lib/warden-core" } warden-stack = { path = "lib/warden-stack" } + +[profile.release] +lto = true +strip = true +codegen-units = 1 diff --git a/crates/pseudonyms/Cargo.toml b/crates/pseudonyms/Cargo.toml index 9f06188..1efd5c5 100644 --- a/crates/pseudonyms/Cargo.toml +++ b/crates/pseudonyms/Cargo.toml @@ -32,8 +32,3 @@ warden-core = { workspace = true, features = ["pseudonyms", "serde-time"] } [dependencies.warden-stack] workspace = true features = ["api", "cache", "postgres", "opentelemetry-tonic", "tracing-loki"] - -[profile.release] -lto = true -strip = true -codegen-units = 1 diff --git a/crates/pseudonyms/src/main.rs b/crates/pseudonyms/src/main.rs index 7b67557..127cb2a 100644 --- a/crates/pseudonyms/src/main.rs +++ b/crates/pseudonyms/src/main.rs @@ -1,8 +1,8 @@ use clap::Parser; -use warden_stack::{Configuration, Services, tracing::Tracing}; use std::sync::Arc; use tracing::error; use warden_pseudonyms::state::{AppHandle, AppState}; +use warden_stack::{Configuration, Services, tracing::Tracing}; /// warden-pseudonyms #[derive(Parser, Debug)] @@ -47,18 +47,17 @@ async fn main() -> anyhow::Result<()> { .inspect_err(|e| error!("cache: {e}"))? .build(); - let postgres = services + let postgres = services .postgres .take() .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; - let cache = services + let cache = services .cache .take() .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; - let services = warden_pseudonyms::state::Services { postgres , cache}; - + let services = warden_pseudonyms::state::Services { postgres, cache }; let state = AppState::new(services, config, Some(provider))?; diff --git a/crates/pseudonyms/src/server/interceptor.rs b/crates/pseudonyms/src/server/interceptor.rs index f30f140..eeb36c2 100644 --- a/crates/pseudonyms/src/server/interceptor.rs +++ b/crates/pseudonyms/src/server/interceptor.rs @@ -1,9 +1,9 @@ +use tonic::{Status, service::Interceptor}; +use tracing::Span; use warden_stack::{ opentelemetry::global, tracing::telemetry::tonic::extractor, tracing_opentelemetry::OpenTelemetrySpanExt, }; -use tonic::{Status, service::Interceptor}; -use tracing::Span; #[derive(Clone, Copy)] pub struct MyInterceptor; diff --git a/crates/pseudonyms/src/state.rs b/crates/pseudonyms/src/state.rs index 349c324..82f025f 100644 --- a/crates/pseudonyms/src/state.rs +++ b/crates/pseudonyms/src/state.rs @@ -1,11 +1,13 @@ mod mutate; use std::{ - net::{Ipv6Addr, SocketAddr}, ops::Deref, sync::Arc + net::{Ipv6Addr, SocketAddr}, + ops::Deref, + sync::Arc, }; use sqlx::PgPool; -use warden_stack::{cache::RedisManager, tracing::SdkTracerProvider, Configuration}; +use warden_stack::{Configuration, cache::RedisManager, tracing::SdkTracerProvider}; use crate::AppConfig; diff --git a/crates/pseudonyms/src/state/mutate.rs b/crates/pseudonyms/src/state/mutate.rs index b0d2f7a..6a737b5 100644 --- a/crates/pseudonyms/src/state/mutate.rs +++ b/crates/pseudonyms/src/state/mutate.rs @@ -1,6 +1,3 @@ -use warden_stack::{ - opentelemetry_semantic_conventions::attribute, tracing_opentelemetry::OpenTelemetrySpanExt, -}; use time::OffsetDateTime; use tonic::{Request, Response, Status}; use tracing::{Instrument, info_span, instrument}; @@ -10,6 +7,9 @@ use warden_core::{ CreatePseudonymRequest, mutate_pseudonym_server::MutatePseudonym, }, }; +use warden_stack::{ + opentelemetry_semantic_conventions::attribute, tracing_opentelemetry::OpenTelemetrySpanExt, +}; use crate::state::AppHandle; diff --git a/crates/pseudonyms/tests/helpers.rs b/crates/pseudonyms/tests/helpers.rs index 9f512df..589049a 100644 --- a/crates/pseudonyms/tests/helpers.rs +++ b/crates/pseudonyms/tests/helpers.rs @@ -1,9 +1,9 @@ use sqlx::PgPool; -use warden_stack::{Configuration, cache::RedisManager}; use tokio::sync::oneshot; use tonic::transport::Channel; use warden_core::pseudonyms::transaction_relationship::mutate_pseudonym_client::MutatePseudonymClient; use warden_pseudonyms::state::{AppHandle, AppState, Services}; +use warden_stack::{Configuration, cache::RedisManager}; use std::sync::Arc; @@ -34,7 +34,7 @@ impl TestApp { cache, }; - let state = AppHandle(Arc::new(AppState::new(services, config, None).unwrap())); + let state = AppHandle(Arc::new(AppState::new(services, config, None).unwrap())); dbg!(&state.addr.port()); diff --git a/crates/warden/Cargo.toml b/crates/warden/Cargo.toml index dde983b..34da6f3 100644 --- a/crates/warden/Cargo.toml +++ b/crates/warden/Cargo.toml @@ -14,10 +14,22 @@ clap = { workspace = true, features = ["derive"] } config = { workspace = true, features = ["convert-case", "toml"] } metrics.workspace = true metrics-exporter-prometheus.workspace = true +prost.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true +sqlx = { workspace = true, features = [ + "json", + "macros", + "migrate", + "postgres", + "runtime-tokio", + "time", + "tls-rustls", + "uuid", +] } time.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } +tonic.workspace = true tower-http = { workspace = true, features = [ "timeout", "trace", @@ -30,6 +42,7 @@ utoipa-rapidoc = { workspace = true, optional = true } utoipa-redoc = { workspace = true, optional = true } utoipa-scalar = { workspace = true, optional = true } utoipa-swagger-ui = { workspace = true, optional = true } +uuid = { workspace = true, features = ["v7"] } warden-core = { workspace = true, features = ["message", "pseudonyms", "serde", "openapi"] } [features] @@ -44,4 +57,4 @@ tower = { workspace = true, features = ["util"] } [dependencies.warden-stack] workspace = true -features = ["api", "opentelemetry", "tracing-loki"] +features = ["api", "cache", "opentelemetry", "postgres", "tracing-loki"] diff --git a/crates/warden/migrations/20250810160034_transaction_history.sql b/crates/warden/migrations/20250810160034_transaction_history.sql new file mode 100644 index 0000000..e75e441 --- /dev/null +++ b/crates/warden/migrations/20250810160034_transaction_history.sql @@ -0,0 +1,38 @@ +create table pacs002 ( + id uuid primary key, + document jsonb not null, + created_at timestamptz default now(), + processed boolean, + + message_id text generated always as ( + document->'f_i_to_f_i_pmt_sts_rpt'->'grp_hdr'->>'msg_id' + ) stored, + + end_to_end_id text generated always as ( + document->'f_i_to_f_i_pmt_sts_rpt'->'tx_inf_and_sts'->0->>'orgnl_end_to_end_id' + ) stored, + + constraint unique_msgid_e2eid_pacs002 unique (message_id, end_to_end_id), + + constraint message_id_not_null check (message_id is not null), + constraint end_to_end_id_not_null check (end_to_end_id is not null) +); + +create table pacs008 ( + id uuid primary key, + document jsonb not null, + created_at timestamptz default now(), + processed boolean, + + message_id text generated always as ( + document->'f_i_to_f_i_cstmr_cdt_trf'->'grp_hdr'->>'msg_id' + ) stored, + + end_to_end_id text generated always as ( + document->'f_i_to_f_i_cstmr_cdt_trf'->'cdt_trf_tx_inf'->0->'pmt_id'->>'end_to_end_id' + ) stored, + + constraint unique_msgid_e2eid_pacs008 unique (message_id, end_to_end_id), + constraint message_id_not_null check (message_id is not null), + constraint end_to_end_id_not_null check (end_to_end_id is not null) +); diff --git a/crates/warden/src/cnfg.rs b/crates/warden/src/cnfg.rs index f3fa016..53af683 100644 --- a/crates/warden/src/cnfg.rs +++ b/crates/warden/src/cnfg.rs @@ -4,4 +4,6 @@ use serde::Deserialize; #[serde(rename_all = "kebab-case")] pub struct LocalConfig { pub cache_ttl: u64, + #[serde(rename = "pseudonyms-endpoint")] + pub pseudonyms_endpoint: std::sync::Arc<str>, } diff --git a/crates/warden/src/main.rs b/crates/warden/src/main.rs index 68b185c..f551664 100644 --- a/crates/warden/src/main.rs +++ b/crates/warden/src/main.rs @@ -7,8 +7,8 @@ mod version; use std::net::{Ipv6Addr, SocketAddr}; use clap::{Parser, command}; -use warden_stack::{Configuration, tracing::Tracing}; -use tracing::info; +use tracing::{error, info}; +use warden_stack::{Configuration, Services, tracing::Tracing}; use crate::state::AppState; @@ -44,7 +44,28 @@ async fn main() -> Result<(), error::AppError> { tokio::spawn(tracing.loki_task); - let state = AppState::create(&config).await?; + let mut services = Services::builder() + .postgres(&config.database) + .await + .inspect_err(|e| error!("database: {e}"))? + .cache(&config.cache) + .await + .inspect_err(|e| error!("cache: {e}"))? + .build(); + + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let services = state::Services { postgres, cache }; + + let state = AppState::create(services, &config).await?; let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port)); diff --git a/crates/warden/src/server.rs b/crates/warden/src/server.rs index 6712516..032fe95 100644 --- a/crates/warden/src/server.rs +++ b/crates/warden/src/server.rs @@ -1,3 +1,4 @@ +pub mod grpc; mod middleware; mod routes; diff --git a/crates/warden/src/server/grpc.rs b/crates/warden/src/server/grpc.rs new file mode 100644 index 0000000..f239ddb --- /dev/null +++ b/crates/warden/src/server/grpc.rs @@ -0,0 +1,28 @@ +pub mod interceptor { + use tonic::{ + Status, + service::{Interceptor, interceptor::InterceptedService}, + transport::Channel, + }; + use tracing::Span; + use warden_stack::{ + opentelemetry::global, tracing::telemetry::tonic::injector, + tracing_opentelemetry::OpenTelemetrySpanExt, + }; + + pub type Intercepted = InterceptedService<Channel, MyInterceptor>; + + #[derive(Clone, Copy)] + pub struct MyInterceptor; + + impl Interceptor for MyInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + let cx = Span::current().context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut())) + }); + + Ok(request) + } + } +} diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs index 3efc7f1..2478cf0 100644 --- a/crates/warden/src/server/routes/processor/pacs008.rs +++ b/crates/warden/src/server/routes/processor/pacs008.rs @@ -1,10 +1,16 @@ use axum::{extract::State, response::IntoResponse}; -use warden_stack::tracing_opentelemetry::OpenTelemetrySpanExt; -use tracing::{debug, error, trace, warn}; +use std::sync::Arc; +use tracing::{Instrument, Span, debug, error, info_span, instrument, trace, warn}; +use uuid::Uuid; use warden_core::{ google::r#type::Money, - iso20022::{pacs008::Pacs008Document, TransactionType}, - message::DataCache, pseudonyms::transaction_relationship::{CreatePseudonymRequest, TransactionRelationship}, + iso20022::{TransactionType, pacs008::Pacs008Document}, + message::DataCache, + pseudonyms::transaction_relationship::{CreatePseudonymRequest, TransactionRelationship}, +}; +use warden_stack::{ + opentelemetry_semantic_conventions::attribute, redis::AsyncCommands, + tracing_opentelemetry::OpenTelemetrySpanExt, }; use crate::{error::AppError, server::routes::PACS008_001_12, state::AppHandle, version::Version}; @@ -111,7 +117,7 @@ pub(super) async fn post_pacs008( ..Default::default() }; - let request = CreatePseudonymRequest { + let request = CreatePseudonymRequest { transaction_relationship: Some(transaction_relationship), debtor_id: data_cache.dbtr_id.to_string(), debtor_account_id: data_cache.dbtr_acct_id.to_string(), @@ -119,9 +125,59 @@ pub(super) async fn post_pacs008( creditor_account_id: data_cache.cdtr_acct_id.to_string(), }; - debug!(%msg_id, %end_to_end_id, "constructed transaction relationship"); + let mut pseudonyms_client = state.mutate_pseudonym_client.clone(); + + trace!("updating pseudonyms"); + + let pseudonyms_fut = async { + let span = info_span!("create.pseudonyms.account"); + span.set_attribute(attribute::RPC_SERVICE, "pseudonyms"); + pseudonyms_client + .create_pseudonym(request) + .instrument(span) + .await + .map_err(|e| { + error!(error = %e, "failed to create pseudonyms"); + anyhow::anyhow!("could not create pseudonyms") + }) + }; + + let (_, _) = tokio::try_join!( + pseudonyms_fut, + set_cache(&end_to_end_id, &state, &data_cache) + )?; + trace!("pseudonyms saved"); + + let id = Uuid::now_v7(); + debug!(%id, "inserting transaction into history"); + + let span = info_span!("create.transaction_history.pacs008"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "pacs008"); + + trace!(id = ?id, "saving transaction history"); + sqlx::query!( + "insert into pacs008 (id, document) values ($1, $2)", + id, + sqlx::types::Json(&transaction) as _ + ) + .execute(&state.services.postgres) + .instrument(span) + .await?; + debug!(%id, %msg_id, "transaction added to history"); + + let payload = warden_core::message::Payload { + tx_tp: tx_tp.to_string(), + transaction: Some(warden_core::message::payload::Transaction::Pacs008( + transaction.clone(), + )), + data_cache: Some(data_cache), + ..Default::default() + }; + Ok(String::default()) } @@ -230,3 +286,27 @@ pub fn build_data_cache(transaction: &Pacs008Document) -> anyhow::Result<DataCac Ok(data_cache) } + +#[instrument(skip(state), fields(end_to_end_id = end_to_end_id))] +pub async fn set_cache( + end_to_end_id: &str, + state: &AppHandle, + data_cache: &DataCache, +) -> anyhow::Result<()> { + trace!("updating cache"); + let span = Span::current(); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "set"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, end_to_end_id.to_string()); + let mut cache_update = state.services.cache.get().await?; + let bytes = prost::Message::encode_to_vec(data_cache); + cache_update + .set_ex::<_, _, ()>(&end_to_end_id, bytes, state.app_config.cache_ttl) + .await + .map_err(|e| { + error!("cache: {e}"); + anyhow::anyhow!("internal server error") + })?; + + Ok(()) +} diff --git a/crates/warden/src/state.rs b/crates/warden/src/state.rs index 8e5b182..eac014a 100644 --- a/crates/warden/src/state.rs +++ b/crates/warden/src/state.rs @@ -1,7 +1,15 @@ -use warden_stack::{Configuration, Environment}; +use sqlx::PgPool; use std::{ops::Deref, sync::Arc}; +use tonic::transport::Endpoint; +use tracing::error; +use warden_core::pseudonyms::transaction_relationship::mutate_pseudonym_client::MutatePseudonymClient; +use warden_stack::{Configuration, Environment, cache::RedisManager}; -use crate::{cnfg::LocalConfig, error::AppError}; +use crate::{ + cnfg::LocalConfig, + error::AppError, + server::grpc::interceptor::{Intercepted, MyInterceptor}, +}; #[derive(Clone)] pub struct AppHandle(Arc<AppState>); @@ -15,18 +23,38 @@ impl Deref for AppHandle { } #[derive(Clone)] -pub struct Services {} +pub struct Services { + pub postgres: PgPool, + pub cache: RedisManager, +} pub struct AppState { pub environment: Environment, + pub mutate_pseudonym_client: MutatePseudonymClient<Intercepted>, + pub services: Services, + pub app_config: LocalConfig, } impl AppState { - pub async fn create(configuration: &Configuration) -> Result<AppHandle, AppError> { + pub async fn create( + services: Services, + configuration: &Configuration, + ) -> Result<AppHandle, AppError> { let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + let channel = Endpoint::new(local_config.pseudonyms_endpoint.to_string())? + .connect() + .await + .inspect_err(|e| error!("could not connect to pseudonyms service: {e}"))?; + + let mutate_pseudonym_client = + MutatePseudonymClient::with_interceptor(channel, MyInterceptor); + Ok(AppHandle(Arc::new(Self { environment: configuration.application.env, + mutate_pseudonym_client, + services, + app_config: local_config, }))) } } diff --git a/crates/warden/warden.toml b/crates/warden/warden.toml index 92ad599..00f76b8 100644 --- a/crates/warden/warden.toml +++ b/crates/warden/warden.toml @@ -4,10 +4,33 @@ port = 2210 [misc] cache-ttl = 1000 +pseudonyms-endpoint = "http://localhost:1610" [monitoring] log-level = "warden=trace,info" opentelemetry-endpoint = "http://localhost:4317" loki-endpoint = "http://localhost:3100" +[database] +pool_size = 100 +port = 5432 +name = "transaction_history" +host = "localhost" +password = "password" +user = "postgres" + +[cache] +dsn = "redis://localhost:6379" +pooled = true +type = "non-clustered" # clustered, non-clustered or sentinel +max-connections = 100 + +[cache.sentinel] +master-name = "mymaster" +nodes = [ + { host = "127.0.0.1", port = 26379 }, + { host = "127.0.0.2", port = 26379 }, + { host = "127.0.0.3", port = 26379 }, +] + # vim:ft=toml diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs index 6d5efbb..1f71f35 100644 --- a/lib/warden-core/build.rs +++ b/lib/warden-core/build.rs @@ -13,9 +13,7 @@ impl Entity { #[cfg(feature = "message")] fn iso20022_protos() -> Vec<&'static str> { - vec![ - "proto/warden_message.proto", - ] + vec!["proto/warden_message.proto"] } #[cfg(feature = "pseudonyms")] @@ -45,7 +43,7 @@ impl Entity { fn main() -> Result<(), Box<dyn std::error::Error>> { println!("cargo:rerun-if-changed=../../proto"); -#[cfg(any(feature = "message", feature = "pseudonyms"))] + #[cfg(any(feature = "message", feature = "pseudonyms"))] let mut protos: Vec<&'static str> = vec![]; #[cfg(feature = "message")] diff --git a/lib/warden-core/src/lib.rs b/lib/warden-core/src/lib.rs index 53f25f2..d039516 100644 --- a/lib/warden-core/src/lib.rs +++ b/lib/warden-core/src/lib.rs @@ -8,8 +8,7 @@ /// Type file descriptor #[cfg(any(feature = "message", feature = "pseudonyms"))] -pub const FILE_DESCRIPTOR_SET: &[u8] = - tonic::include_file_descriptor_set!("warden_descriptor"); +pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("warden_descriptor"); /// Google well known types #[allow(missing_docs)] |