diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-10 13:45:11 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-10 13:45:11 +0200 |
commit | dbff6fa4e5684d8636fd46ecadfe5874a253bd49 (patch) | |
tree | 1039d96c2d02fbc6df1b7b218446c3a8acf1c6b5 | |
parent | 8cda165f9d3f108c80a4c9ee10c68a28299cb2d1 (diff) | |
download | warden-dbff6fa4e5684d8636fd46ecadfe5874a253bd49.tar.bz2 warden-dbff6fa4e5684d8636fd46ecadfe5874a253bd49.zip |
feat(pseudonyms): serve api
-rw-r--r-- | Cargo.lock | 8 | ||||
-rw-r--r-- | crates/pseudonyms/Cargo.toml | 11 | ||||
-rw-r--r-- | crates/pseudonyms/migrations/20250810112925_account.sql | 3 | ||||
-rw-r--r-- | crates/pseudonyms/migrations/20250810112930_entity.sql | 4 | ||||
-rw-r--r-- | crates/pseudonyms/migrations/20250810112936_account_holder.sql | 6 | ||||
-rw-r--r-- | crates/pseudonyms/migrations/20250810112945_transaction_relationship.sql | 29 | ||||
-rw-r--r-- | crates/pseudonyms/pseudonyms.toml | 25 | ||||
-rw-r--r-- | crates/pseudonyms/src/lib.rs | 23 | ||||
-rw-r--r-- | crates/pseudonyms/src/main.rs | 70 | ||||
-rw-r--r-- | crates/pseudonyms/src/server.rs | 65 | ||||
-rw-r--r-- | crates/pseudonyms/src/server/interceptor.rs | 23 | ||||
-rw-r--r-- | crates/pseudonyms/src/state.rs | 56 | ||||
-rw-r--r-- | crates/pseudonyms/src/state/mutate.rs | 190 | ||||
-rw-r--r-- | lib/warden-stack/Cargo.toml | 2 |
14 files changed, 511 insertions, 4 deletions
@@ -2827,15 +2827,18 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", + "rustls", "serde", "serde_json", "sha2", "smallvec", "thiserror 2.0.12", + "time", "tokio", "tokio-stream", "tracing", "url", + "webpki-roots 0.26.11", ] [[package]] @@ -2914,6 +2917,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror 2.0.12", + "time", "tracing", "whoami", ] @@ -2951,6 +2955,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror 2.0.12", + "time", "tracing", "whoami", ] @@ -2975,6 +2980,7 @@ dependencies = [ "serde_urlencoded", "sqlx-core", "thiserror 2.0.12", + "time", "tracing", "url", ] @@ -3825,8 +3831,10 @@ dependencies = [ "metrics-exporter-prometheus", "serde", "serde_json", + "sqlx", "time", "tokio", + "tonic 0.14.0", "tracing", "warden-core", "warden-stack", diff --git a/crates/pseudonyms/Cargo.toml b/crates/pseudonyms/Cargo.toml index 02ecddc..1efd5c5 100644 --- a/crates/pseudonyms/Cargo.toml +++ b/crates/pseudonyms/Cargo.toml @@ -15,11 +15,20 @@ metrics.workspace = true metrics-exporter-prometheus.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true +sqlx = { workspace = true, features = [ + "macros", + "migrate", + "postgres", + "runtime-tokio", + "time", + "tls-rustls", +] } time.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } +tonic.workspace = true tracing.workspace = true warden-core = { workspace = true, features = ["pseudonyms", "serde-time"] } [dependencies.warden-stack] workspace = true -features = ["api", "cache", "postgres", "opentelemetry", "tracing-loki"] +features = ["api", "cache", "postgres", "opentelemetry-tonic", "tracing-loki"] diff --git a/crates/pseudonyms/migrations/20250810112925_account.sql b/crates/pseudonyms/migrations/20250810112925_account.sql new file mode 100644 index 0000000..ec5ffbb --- /dev/null +++ b/crates/pseudonyms/migrations/20250810112925_account.sql @@ -0,0 +1,3 @@ +create table account ( + id varchar primary key +); diff --git a/crates/pseudonyms/migrations/20250810112930_entity.sql b/crates/pseudonyms/migrations/20250810112930_entity.sql new file mode 100644 index 0000000..7edce20 --- /dev/null +++ b/crates/pseudonyms/migrations/20250810112930_entity.sql @@ -0,0 +1,4 @@ +create table entity ( + id varchar primary key, + cre_dt_tm timestamptz not null +); diff --git a/crates/pseudonyms/migrations/20250810112936_account_holder.sql b/crates/pseudonyms/migrations/20250810112936_account_holder.sql new file mode 100644 index 0000000..7d7c844 --- /dev/null +++ b/crates/pseudonyms/migrations/20250810112936_account_holder.sql @@ -0,0 +1,6 @@ +create table account_holder ( + source varchar references entity(id), + destination varchar references account(id), + cre_dt_tm timestamptz not null, + primary key (source, destination) +); diff --git a/crates/pseudonyms/migrations/20250810112945_transaction_relationship.sql b/crates/pseudonyms/migrations/20250810112945_transaction_relationship.sql new file mode 100644 index 0000000..b28b61c --- /dev/null +++ b/crates/pseudonyms/migrations/20250810112945_transaction_relationship.sql @@ -0,0 +1,29 @@ +create table transaction_relationship ( + source varchar references account(id), + destination varchar references account(id), + amt_unit bigint not null, + amt_ccy varchar(3) not null, + amt_nanos integer not null, + cre_dt_tm timestamptz not null, + end_to_end_id varchar not null check (trim(end_to_end_id) <> ''), + msg_id varchar not null check (trim(msg_id) <> ''), + pmt_inf_id varchar not null check (trim(pmt_inf_id) <> ''), + tx_tp varchar not null check (trim(tx_tp) <> ''), + lat float8, + lon float8, + tx_sts varchar, + primary key (msg_id, end_to_end_id, tx_tp, pmt_inf_id) +); + +create index idx_transaction_status_range + on transaction_relationship (tx_tp, tx_sts, cre_dt_tm desc); + +create index idx_transaction_e2eid_tp_sts + on transaction_relationship (end_to_end_id, tx_tp, tx_sts); + +create index idx_transaction_accc_only + on transaction_relationship (cre_dt_tm) + where tx_sts = 'ACCC'; + +create index idx_transaction_source_time + on transaction_relationship (source, cre_dt_tm desc); diff --git a/crates/pseudonyms/pseudonyms.toml b/crates/pseudonyms/pseudonyms.toml new file mode 100644 index 0000000..ec8a5a9 --- /dev/null +++ b/crates/pseudonyms/pseudonyms.toml @@ -0,0 +1,25 @@ +[application] +env = "development" +port = 1610 + +[monitoring] +log-level = "warden_pseudonyms=trace,info" +opentelemetry-endpoint = "http://localhost:4317" +loki-endpoint = "http://localhost:3100" + +[misc] +something = "http://localhost:8080" + +[database] +pool_size = 100 +port = 5432 +name = "pseudonyms" +host = "localhost" +user = "postgres" +password = "password" + +[cache] +dsn = "redis://localhost:6379" +pooled = true +type = "non-clustered" # clustered, non-clustered or sentinel +max-connections = 100 diff --git a/crates/pseudonyms/src/lib.rs b/crates/pseudonyms/src/lib.rs new file mode 100644 index 0000000..9d76245 --- /dev/null +++ b/crates/pseudonyms/src/lib.rs @@ -0,0 +1,23 @@ +pub mod server; +pub mod state; + +use std::sync::Arc; + +use serde::Deserialize; +use state::AppHandle; +use tracing::{debug, trace}; + +#[derive(Deserialize, Clone)] +pub struct AppConfig { + pub something: Arc<str>, +} + +pub async fn run(state: AppHandle, tx: tokio::sync::oneshot::Sender<u16>) -> anyhow::Result<()> { + trace!("running migrations"); + sqlx::migrate!("./migrations") + .run(&state.services.postgres) + .await?; + debug!("ran migrations"); + + server::serve(state, tx).await +} diff --git a/crates/pseudonyms/src/main.rs b/crates/pseudonyms/src/main.rs index e7a11a9..7b67557 100644 --- a/crates/pseudonyms/src/main.rs +++ b/crates/pseudonyms/src/main.rs @@ -1,3 +1,69 @@ -fn main() { - println!("Hello, world!"); +use clap::Parser; +use warden_stack::{Configuration, Services, tracing::Tracing}; +use std::sync::Arc; +use tracing::error; +use warden_pseudonyms::state::{AppHandle, AppState}; + +/// warden-pseudonyms +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + let config = include_str!("../pseudonyms.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let mut services = Services::builder() + .postgres(&config.database) + .await + .inspect_err(|e| error!("database: {e}"))? + .cache(&config.cache) + .await + .inspect_err(|e| error!("cache: {e}"))? + .build(); + + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let services = warden_pseudonyms::state::Services { postgres , cache}; + + + let state = AppState::new(services, config, Some(provider))?; + + let (tx, _rx) = tokio::sync::oneshot::channel(); + warden_pseudonyms::run(AppHandle(Arc::new(state)), tx) + .await + .inspect_err(|e| error!("{e}")) } diff --git a/crates/pseudonyms/src/server.rs b/crates/pseudonyms/src/server.rs new file mode 100644 index 0000000..88c46b4 --- /dev/null +++ b/crates/pseudonyms/src/server.rs @@ -0,0 +1,65 @@ +mod interceptor; +use interceptor::MyInterceptor; +use tokio::signal; +use warden_core::pseudonyms::transaction_relationship::mutate_pseudonym_server::MutatePseudonymServer; + +use tonic::transport::{Server, server::TcpIncoming}; +use tracing::info; + +use crate::state::AppHandle; + +pub async fn serve(state: AppHandle, tx: tokio::sync::oneshot::Sender<u16>) -> anyhow::Result<()> { + let listener = tokio::net::TcpListener::bind(state.addr).await?; + + let socket_addr = listener + .local_addr() + .expect("should get socket_addr from listener"); + + tx.send(socket_addr.port()) + .expect("port channel to be open"); + + info!(addr = ?socket_addr, "starting server"); + + Server::builder() + .trace_fn(|_| tracing::info_span!(env!("CARGO_PKG_NAME"))) + // .add_service(QueryUsersServer::new(state.clone())) + .add_service(MutatePseudonymServer::with_interceptor( + state.clone(), + MyInterceptor, + )) + .serve_with_incoming_shutdown(TcpIncoming::from(listener), shutdown_signal(state)) + .await?; + + Ok(()) +} +async fn shutdown_signal(state: AppHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + if let Some(ref provider) = state.tracer_provider { + let _ = provider.shutdown(); + } + }, + _ = terminate => { + if let Some(ref provider) = state.tracer_provider { + let _ = provider.shutdown(); + } + }, + } +} diff --git a/crates/pseudonyms/src/server/interceptor.rs b/crates/pseudonyms/src/server/interceptor.rs new file mode 100644 index 0000000..f30f140 --- /dev/null +++ b/crates/pseudonyms/src/server/interceptor.rs @@ -0,0 +1,23 @@ +use warden_stack::{ + opentelemetry::global, tracing::telemetry::tonic::extractor, + tracing_opentelemetry::OpenTelemetrySpanExt, +}; +use tonic::{Status, service::Interceptor}; +use tracing::Span; + +#[derive(Clone, Copy)] +pub struct MyInterceptor; + +impl Interceptor for MyInterceptor { + fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + let span = Span::current(); + + let cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::MetadataMap(request.metadata())) + }); + + span.set_parent(cx); + + Ok(request) + } +} diff --git a/crates/pseudonyms/src/state.rs b/crates/pseudonyms/src/state.rs new file mode 100644 index 0000000..349c324 --- /dev/null +++ b/crates/pseudonyms/src/state.rs @@ -0,0 +1,56 @@ +mod mutate; + +use std::{ + net::{Ipv6Addr, SocketAddr}, ops::Deref, sync::Arc +}; + +use sqlx::PgPool; +use warden_stack::{cache::RedisManager, tracing::SdkTracerProvider, Configuration}; + +use crate::AppConfig; + +#[derive(Clone)] +pub struct AppHandle(pub Arc<AppState>); + +impl Deref for AppHandle { + type Target = Arc<AppState>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Clone)] +pub struct Services { + pub postgres: PgPool, + pub cache: RedisManager, +} + +#[derive(Clone)] +pub struct AppState { + pub addr: SocketAddr, + pub services: Services, + pub config: Configuration, + pub app_config: AppConfig, + pub tracer_provider: Option<SdkTracerProvider>, +} + +impl AppState { + pub fn new( + services: Services, + config: Configuration, + tracer_provider: Option<SdkTracerProvider>, + ) -> anyhow::Result<Self> { + let listen_address = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port)); + + let app_config: AppConfig = serde_json::from_value(config.misc.clone())?; + + Ok(Self { + addr: listen_address, + services, + config, + tracer_provider, + app_config, + }) + } +} diff --git a/crates/pseudonyms/src/state/mutate.rs b/crates/pseudonyms/src/state/mutate.rs new file mode 100644 index 0000000..b0d2f7a --- /dev/null +++ b/crates/pseudonyms/src/state/mutate.rs @@ -0,0 +1,190 @@ +use warden_stack::{ + opentelemetry_semantic_conventions::attribute, tracing_opentelemetry::OpenTelemetrySpanExt, +}; +use time::OffsetDateTime; +use tonic::{Request, Response, Status}; +use tracing::{Instrument, info_span, instrument}; +use warden_core::{ + google, + pseudonyms::transaction_relationship::{ + CreatePseudonymRequest, mutate_pseudonym_server::MutatePseudonym, + }, +}; + +use crate::state::AppHandle; + +#[tonic::async_trait] +impl MutatePseudonym for AppHandle { + #[instrument(skip(self, request), err(Debug))] + async fn create_pseudonym( + &self, + request: Request<CreatePseudonymRequest>, + ) -> Result<Response<google::protobuf::Empty>, Status> { + let body = request.into_inner(); + let transaction_relationship = body + .transaction_relationship + .ok_or_else(|| tonic::Status::data_loss("transaction_relationship"))?; + let mut tx = self + .services + .postgres + .begin() + .await + .map_err(|_e| tonic::Status::internal("database is not ready"))?; + + let span = info_span!("create.pseudonyms.account"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_QUERY_TEXT, "insert into account"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "account"); + span.set_attribute( + attribute::DB_QUERY_PARAMETER, + format!("{}{}", body.creditor_account_id, body.debtor_account_id), + ); + + sqlx::query!( + "insert into account (id) + select * from unnest($1::text[]) + on conflict (id) do nothing", + &[ + body.debtor_account_id.to_string(), + body.creditor_account_id.to_string() + ] + ) + .execute(&mut *tx) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let span = info_span!("create.pseudonyms.entity"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_QUERY_TEXT, "insert into entity"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "entity"); + span.set_attribute( + attribute::DB_QUERY_PARAMETER, + format!("{}{}", body.creditor_id, body.debtor_id), + ); + let cre_dt_tm = transaction_relationship.cre_dt_tm.expect("cre_dt_tm"); + let cre_dt_tm = OffsetDateTime::try_from(cre_dt_tm).expect("offset date time conv"); + + sqlx::query!( + "insert into entity (id, cre_dt_tm) + select * from unnest($1::text[], $2::timestamptz[]) + on conflict (id) + do update set cre_dt_tm = excluded.cre_dt_tm + ", + &[body.creditor_id.to_string(), body.debtor_id.to_string()], + &[cre_dt_tm, cre_dt_tm] + ) + .execute(&mut *tx) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let account_holders = &[ + ( + body.debtor_id.to_string(), + body.creditor_account_id.to_string(), + ), + ( + body.creditor_id.to_string(), + body.creditor_account_id.to_string(), + ), + ]; + let mut deb_holder = vec![]; + let mut cred_holder = vec![]; + let mut dts = vec![]; + + account_holders.iter().for_each(|todo| { + deb_holder.push(todo.0.to_string()); + cred_holder.push(todo.1.to_string()); + dts.push(cre_dt_tm); + }); + + let span = info_span!("create.pseudonyms.account_holder"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_QUERY_TEXT, "insert into account_holder"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "account_holder"); + + sqlx::query!( + "insert into account_holder (source, destination, cre_dt_tm) + select * from unnest($1::text[], $2::text[], $3::timestamptz[]) + on conflict (source, destination) + do update set cre_dt_tm = excluded.cre_dt_tm + ", + &deb_holder, + &cred_holder, + &dts + ) + .execute(&mut *tx) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let latlng: Option<(f64, f64)> = transaction_relationship + .latlng + .map(|value| (value.latitude, value.longitude)); + + let span = info_span!("create.pseudonyms.transaction_relationship"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "insert"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "transaction_relationship"); + span.set_attribute( + attribute::DB_QUERY_TEXT, + "insert into transaction_relationship", + ); + + let amt = transaction_relationship + .amt + .ok_or_else(|| tonic::Status::data_loss("amt"))?; + + sqlx::query!( + " + insert into transaction_relationship ( + source, + destination, + amt_unit, + amt_ccy, + amt_nanos, + cre_dt_tm, + end_to_end_id, + msg_id, + pmt_inf_id, + tx_tp, + lat, + lon, + tx_sts + ) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ", + transaction_relationship.from, + transaction_relationship.to, + amt.units, + amt.currency_code, + amt.nanos, + cre_dt_tm, + transaction_relationship.end_to_end_id, + transaction_relationship.msg_id, + transaction_relationship.pmt_inf_id, + transaction_relationship.tx_tp, + latlng.map(|lat| lat.0), + latlng.map(|lat| lat.1), + transaction_relationship.tx_sts, + ) + .execute(&mut *tx) + .instrument(span) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let span = info_span!("transaction.commit"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "commit"); + + tx.commit() + .instrument(span) + .await + .map_err(|_e| tonic::Status::internal("database is not ready"))?; + Ok(Response::new(google::protobuf::Empty::default())) + } +} diff --git a/lib/warden-stack/Cargo.toml b/lib/warden-stack/Cargo.toml index d7c1eb8..a2e910b 100644 --- a/lib/warden-stack/Cargo.toml +++ b/lib/warden-stack/Cargo.toml @@ -58,7 +58,7 @@ opentelemetry = [ ] postgres = ["sqlx/postgres", "url/serde", "secrecy/serde"] tracing = ["dep:tracing", "tracing-subscriber/env-filter"] -opentelemetry-tonic = ["dep:tonic"] +opentelemetry-tonic = ["dep:tonic", "opentelemetry"] tracing-loki = ["dep:tracing-loki", "tracing"] [[example]] |