diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-10 13:45:11 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-10 13:45:11 +0200 |
commit | dbff6fa4e5684d8636fd46ecadfe5874a253bd49 (patch) | |
tree | 1039d96c2d02fbc6df1b7b218446c3a8acf1c6b5 /crates/pseudonyms/src/state/mutate.rs | |
parent | 8cda165f9d3f108c80a4c9ee10c68a28299cb2d1 (diff) | |
download | warden-dbff6fa4e5684d8636fd46ecadfe5874a253bd49.tar.bz2 warden-dbff6fa4e5684d8636fd46ecadfe5874a253bd49.zip |
feat(pseudonyms): serve api
Diffstat (limited to 'crates/pseudonyms/src/state/mutate.rs')
-rw-r--r-- | crates/pseudonyms/src/state/mutate.rs | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/crates/pseudonyms/src/state/mutate.rs b/crates/pseudonyms/src/state/mutate.rs new file mode 100644 index 0000000..b0d2f7a --- /dev/null +++ b/crates/pseudonyms/src/state/mutate.rs @@ -0,0 +1,190 @@ +use warden_stack::{ + opentelemetry_semantic_conventions::attribute, tracing_opentelemetry::OpenTelemetrySpanExt, +}; +use time::OffsetDateTime; +use tonic::{Request, Response, Status}; +use tracing::{Instrument, info_span, instrument}; +use warden_core::{ + google, + pseudonyms::transaction_relationship::{ + CreatePseudonymRequest, mutate_pseudonym_server::MutatePseudonym, + }, +}; + +use crate::state::AppHandle; + +#[tonic::async_trait] +impl MutatePseudonym for AppHandle { + #[instrument(skip(self, request), err(Debug))] + async fn create_pseudonym( + &self, + request: Request<CreatePseudonymRequest>, + ) -> Result<Response<google::protobuf::Empty>, Status> { + let body = request.into_inner(); + let transaction_relationship = body + .transaction_relationship + .ok_or_else(|| tonic::Status::data_loss("transaction_relationship"))?; + let mut tx = self + .services + .postgres + .begin() + .await + .map_err(|_e| tonic::Status::internal("database is not ready"))?; + + let span = info_span!("create.pseudonyms.account"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_QUERY_TEXT, "insert into account"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "account"); + span.set_attribute( + attribute::DB_QUERY_PARAMETER, + format!("{}{}", body.creditor_account_id, body.debtor_account_id), + ); + + sqlx::query!( + "insert into account (id) + select * from unnest($1::text[]) + on conflict (id) do nothing", + &[ + body.debtor_account_id.to_string(), + body.creditor_account_id.to_string() + ] + ) + .execute(&mut *tx) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let span = info_span!("create.pseudonyms.entity"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_QUERY_TEXT, "insert into entity"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "entity"); + span.set_attribute( + attribute::DB_QUERY_PARAMETER, + format!("{}{}", body.creditor_id, body.debtor_id), + ); + let cre_dt_tm = transaction_relationship.cre_dt_tm.expect("cre_dt_tm"); + let cre_dt_tm = OffsetDateTime::try_from(cre_dt_tm).expect("offset date time conv"); + + sqlx::query!( + "insert into entity (id, cre_dt_tm) + select * from unnest($1::text[], $2::timestamptz[]) + on conflict (id) + do update set cre_dt_tm = excluded.cre_dt_tm + ", + &[body.creditor_id.to_string(), body.debtor_id.to_string()], + &[cre_dt_tm, cre_dt_tm] + ) + .execute(&mut *tx) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let account_holders = &[ + ( + body.debtor_id.to_string(), + body.creditor_account_id.to_string(), + ), + ( + body.creditor_id.to_string(), + body.creditor_account_id.to_string(), + ), + ]; + let mut deb_holder = vec![]; + let mut cred_holder = vec![]; + let mut dts = vec![]; + + account_holders.iter().for_each(|todo| { + deb_holder.push(todo.0.to_string()); + cred_holder.push(todo.1.to_string()); + dts.push(cre_dt_tm); + }); + + let span = info_span!("create.pseudonyms.account_holder"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_QUERY_TEXT, "insert into account_holder"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "account_holder"); + + sqlx::query!( + "insert into account_holder (source, destination, cre_dt_tm) + select * from unnest($1::text[], $2::text[], $3::timestamptz[]) + on conflict (source, destination) + do update set cre_dt_tm = excluded.cre_dt_tm + ", + &deb_holder, + &cred_holder, + &dts + ) + .execute(&mut *tx) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let latlng: Option<(f64, f64)> = transaction_relationship + .latlng + .map(|value| (value.latitude, value.longitude)); + + let span = info_span!("create.pseudonyms.transaction_relationship"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "transaction_relationship"); + span.set_attribute( + attribute::DB_QUERY_TEXT, + "insert into transaction_relationship", + ); + + let amt = transaction_relationship + .amt + .ok_or_else(|| tonic::Status::data_loss("amt"))?; + + sqlx::query!( + " + insert into transaction_relationship ( + source, + destination, + amt_unit, + amt_ccy, + amt_nanos, + cre_dt_tm, + end_to_end_id, + msg_id, + pmt_inf_id, + tx_tp, + lat, + lon, + tx_sts + ) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ", + transaction_relationship.from, + transaction_relationship.to, + amt.units, + amt.currency_code, + amt.nanos, + cre_dt_tm, + transaction_relationship.end_to_end_id, + transaction_relationship.msg_id, + transaction_relationship.pmt_inf_id, + transaction_relationship.tx_tp, + latlng.map(|lat| lat.0), + latlng.map(|lat| lat.1), + transaction_relationship.tx_sts, + ) + .execute(&mut *tx) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let span = info_span!("transaction.commit"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "commit"); + + tx.commit() + .instrument(span) + .await + .map_err(|_e| tonic::Status::internal("database is not ready"))?; + Ok(Response::new(google::protobuf::Empty::default())) + } +} |