aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-10 13:45:11 +0200
committerrtkay123 <dev@kanjala.com>2025-08-10 13:45:11 +0200
commitdbff6fa4e5684d8636fd46ecadfe5874a253bd49 (patch)
tree1039d96c2d02fbc6df1b7b218446c3a8acf1c6b5
parent8cda165f9d3f108c80a4c9ee10c68a28299cb2d1 (diff)
downloadwarden-dbff6fa4e5684d8636fd46ecadfe5874a253bd49.tar.bz2
warden-dbff6fa4e5684d8636fd46ecadfe5874a253bd49.zip
feat(pseudonyms): serve api
-rw-r--r--Cargo.lock8
-rw-r--r--crates/pseudonyms/Cargo.toml11
-rw-r--r--crates/pseudonyms/migrations/20250810112925_account.sql3
-rw-r--r--crates/pseudonyms/migrations/20250810112930_entity.sql4
-rw-r--r--crates/pseudonyms/migrations/20250810112936_account_holder.sql6
-rw-r--r--crates/pseudonyms/migrations/20250810112945_transaction_relationship.sql29
-rw-r--r--crates/pseudonyms/pseudonyms.toml25
-rw-r--r--crates/pseudonyms/src/lib.rs23
-rw-r--r--crates/pseudonyms/src/main.rs70
-rw-r--r--crates/pseudonyms/src/server.rs65
-rw-r--r--crates/pseudonyms/src/server/interceptor.rs23
-rw-r--r--crates/pseudonyms/src/state.rs56
-rw-r--r--crates/pseudonyms/src/state/mutate.rs190
-rw-r--r--lib/warden-stack/Cargo.toml2
14 files changed, 511 insertions, 4 deletions
diff --git a/Cargo.lock b/Cargo.lock
index bc2f3f1..0bb2f03 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2827,15 +2827,18 @@ dependencies = [
"memchr",
"once_cell",
"percent-encoding",
+ "rustls",
"serde",
"serde_json",
"sha2",
"smallvec",
"thiserror 2.0.12",
+ "time",
"tokio",
"tokio-stream",
"tracing",
"url",
+ "webpki-roots 0.26.11",
]
[[package]]
@@ -2914,6 +2917,7 @@ dependencies = [
"sqlx-core",
"stringprep",
"thiserror 2.0.12",
+ "time",
"tracing",
"whoami",
]
@@ -2951,6 +2955,7 @@ dependencies = [
"sqlx-core",
"stringprep",
"thiserror 2.0.12",
+ "time",
"tracing",
"whoami",
]
@@ -2975,6 +2980,7 @@ dependencies = [
"serde_urlencoded",
"sqlx-core",
"thiserror 2.0.12",
+ "time",
"tracing",
"url",
]
@@ -3825,8 +3831,10 @@ dependencies = [
"metrics-exporter-prometheus",
"serde",
"serde_json",
+ "sqlx",
"time",
"tokio",
+ "tonic 0.14.0",
"tracing",
"warden-core",
"warden-stack",
diff --git a/crates/pseudonyms/Cargo.toml b/crates/pseudonyms/Cargo.toml
index 02ecddc..1efd5c5 100644
--- a/crates/pseudonyms/Cargo.toml
+++ b/crates/pseudonyms/Cargo.toml
@@ -15,11 +15,20 @@ metrics.workspace = true
metrics-exporter-prometheus.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
+sqlx = { workspace = true, features = [
+ "macros",
+ "migrate",
+ "postgres",
+ "runtime-tokio",
+ "time",
+ "tls-rustls",
+] }
time.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
+tonic.workspace = true
tracing.workspace = true
warden-core = { workspace = true, features = ["pseudonyms", "serde-time"] }
[dependencies.warden-stack]
workspace = true
-features = ["api", "cache", "postgres", "opentelemetry", "tracing-loki"]
+features = ["api", "cache", "postgres", "opentelemetry-tonic", "tracing-loki"]
diff --git a/crates/pseudonyms/migrations/20250810112925_account.sql b/crates/pseudonyms/migrations/20250810112925_account.sql
new file mode 100644
index 0000000..ec5ffbb
--- /dev/null
+++ b/crates/pseudonyms/migrations/20250810112925_account.sql
@@ -0,0 +1,3 @@
+create table account (
+ id varchar primary key
+);
diff --git a/crates/pseudonyms/migrations/20250810112930_entity.sql b/crates/pseudonyms/migrations/20250810112930_entity.sql
new file mode 100644
index 0000000..7edce20
--- /dev/null
+++ b/crates/pseudonyms/migrations/20250810112930_entity.sql
@@ -0,0 +1,4 @@
+create table entity (
+ id varchar primary key,
+ cre_dt_tm timestamptz not null
+);
diff --git a/crates/pseudonyms/migrations/20250810112936_account_holder.sql b/crates/pseudonyms/migrations/20250810112936_account_holder.sql
new file mode 100644
index 0000000..7d7c844
--- /dev/null
+++ b/crates/pseudonyms/migrations/20250810112936_account_holder.sql
@@ -0,0 +1,6 @@
+create table account_holder (
+ source varchar references entity(id),
+ destination varchar references account(id),
+ cre_dt_tm timestamptz not null,
+ primary key (source, destination)
+);
diff --git a/crates/pseudonyms/migrations/20250810112945_transaction_relationship.sql b/crates/pseudonyms/migrations/20250810112945_transaction_relationship.sql
new file mode 100644
index 0000000..b28b61c
--- /dev/null
+++ b/crates/pseudonyms/migrations/20250810112945_transaction_relationship.sql
@@ -0,0 +1,29 @@
+create table transaction_relationship (
+ source varchar references account(id),
+ destination varchar references account(id),
+ amt_unit bigint not null,
+ amt_ccy varchar(3) not null,
+ amt_nanos integer not null,
+ cre_dt_tm timestamptz not null,
+ end_to_end_id varchar not null check (trim(end_to_end_id) <> ''),
+ msg_id varchar not null check (trim(msg_id) <> ''),
+ pmt_inf_id varchar not null check (trim(pmt_inf_id) <> ''),
+ tx_tp varchar not null check (trim(tx_tp) <> ''),
+ lat float8,
+ lon float8,
+ tx_sts varchar,
+ primary key (msg_id, end_to_end_id, tx_tp, pmt_inf_id)
+);
+
+create index idx_transaction_status_range
+ on transaction_relationship (tx_tp, tx_sts, cre_dt_tm desc);
+
+create index idx_transaction_e2eid_tp_sts
+ on transaction_relationship (end_to_end_id, tx_tp, tx_sts);
+
+create index idx_transaction_accc_only
+ on transaction_relationship (cre_dt_tm)
+ where tx_sts = 'ACCC';
+
+create index idx_transaction_source_time
+ on transaction_relationship (source, cre_dt_tm desc);
diff --git a/crates/pseudonyms/pseudonyms.toml b/crates/pseudonyms/pseudonyms.toml
new file mode 100644
index 0000000..ec8a5a9
--- /dev/null
+++ b/crates/pseudonyms/pseudonyms.toml
@@ -0,0 +1,25 @@
+[application]
+env = "development"
+port = 1610
+
+[monitoring]
+log-level = "warden_pseudonyms=trace,info"
+opentelemetry-endpoint = "http://localhost:4317"
+loki-endpoint = "http://localhost:3100"
+
+[misc]
+something = "http://localhost:8080"
+
+[database]
+pool_size = 100
+port = 5432
+name = "pseudonyms"
+host = "localhost"
+user = "postgres"
+password = "password"
+
+[cache]
+dsn = "redis://localhost:6379"
+pooled = true
+type = "non-clustered" # clustered, non-clustered or sentinel
+max-connections = 100
diff --git a/crates/pseudonyms/src/lib.rs b/crates/pseudonyms/src/lib.rs
new file mode 100644
index 0000000..9d76245
--- /dev/null
+++ b/crates/pseudonyms/src/lib.rs
@@ -0,0 +1,23 @@
+pub mod server;
+pub mod state;
+
+use std::sync::Arc;
+
+use serde::Deserialize;
+use state::AppHandle;
+use tracing::{debug, trace};
+
+#[derive(Deserialize, Clone)]
+pub struct AppConfig {
+ pub something: Arc<str>,
+}
+
+pub async fn run(state: AppHandle, tx: tokio::sync::oneshot::Sender<u16>) -> anyhow::Result<()> {
+ trace!("running migrations");
+ sqlx::migrate!("./migrations")
+ .run(&state.services.postgres)
+ .await?;
+ debug!("ran migrations");
+
+ server::serve(state, tx).await
+}
diff --git a/crates/pseudonyms/src/main.rs b/crates/pseudonyms/src/main.rs
index e7a11a9..7b67557 100644
--- a/crates/pseudonyms/src/main.rs
+++ b/crates/pseudonyms/src/main.rs
@@ -1,3 +1,69 @@
-fn main() {
- println!("Hello, world!");
+use clap::Parser;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+use std::sync::Arc;
+use tracing::error;
+use warden_pseudonyms::state::{AppHandle, AppState};
+
+/// warden-pseudonyms
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../pseudonyms.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .postgres(&config.database)
+ .await
+ .inspect_err(|e| error!("database: {e}"))?
+ .cache(&config.cache)
+ .await
+ .inspect_err(|e| error!("cache: {e}"))?
+ .build();
+
+ let postgres = services
+ .postgres
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("database is not ready"))?;
+
+ let cache = services
+ .cache
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?;
+
+ let services = warden_pseudonyms::state::Services { postgres , cache};
+
+
+ let state = AppState::new(services, config, Some(provider))?;
+
+ let (tx, _rx) = tokio::sync::oneshot::channel();
+ warden_pseudonyms::run(AppHandle(Arc::new(state)), tx)
+ .await
+ .inspect_err(|e| error!("{e}"))
}
diff --git a/crates/pseudonyms/src/server.rs b/crates/pseudonyms/src/server.rs
new file mode 100644
index 0000000..88c46b4
--- /dev/null
+++ b/crates/pseudonyms/src/server.rs
@@ -0,0 +1,65 @@
+mod interceptor;
+use interceptor::MyInterceptor;
+use tokio::signal;
+use warden_core::pseudonyms::transaction_relationship::mutate_pseudonym_server::MutatePseudonymServer;
+
+use tonic::transport::{Server, server::TcpIncoming};
+use tracing::info;
+
+use crate::state::AppHandle;
+
+pub async fn serve(state: AppHandle, tx: tokio::sync::oneshot::Sender<u16>) -> anyhow::Result<()> {
+ let listener = tokio::net::TcpListener::bind(state.addr).await?;
+
+ let socket_addr = listener
+ .local_addr()
+ .expect("should get socket_addr from listener");
+
+ tx.send(socket_addr.port())
+ .expect("port channel to be open");
+
+ info!(addr = ?socket_addr, "starting server");
+
+ Server::builder()
+ .trace_fn(|_| tracing::info_span!(env!("CARGO_PKG_NAME")))
+ // .add_service(QueryUsersServer::new(state.clone()))
+ .add_service(MutatePseudonymServer::with_interceptor(
+ state.clone(),
+ MyInterceptor,
+ ))
+ .serve_with_incoming_shutdown(TcpIncoming::from(listener), shutdown_signal(state))
+ .await?;
+
+ Ok(())
+}
+async fn shutdown_signal(state: AppHandle) {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ if let Some(ref provider) = state.tracer_provider {
+ let _ = provider.shutdown();
+ }
+ },
+ _ = terminate => {
+ if let Some(ref provider) = state.tracer_provider {
+ let _ = provider.shutdown();
+ }
+ },
+ }
+}
diff --git a/crates/pseudonyms/src/server/interceptor.rs b/crates/pseudonyms/src/server/interceptor.rs
new file mode 100644
index 0000000..f30f140
--- /dev/null
+++ b/crates/pseudonyms/src/server/interceptor.rs
@@ -0,0 +1,23 @@
+use warden_stack::{
+ opentelemetry::global, tracing::telemetry::tonic::extractor,
+ tracing_opentelemetry::OpenTelemetrySpanExt,
+};
+use tonic::{Status, service::Interceptor};
+use tracing::Span;
+
+#[derive(Clone, Copy)]
+pub struct MyInterceptor;
+
+impl Interceptor for MyInterceptor {
+ fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
+ let span = Span::current();
+
+ let cx = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&extractor::MetadataMap(request.metadata()))
+ });
+
+ span.set_parent(cx);
+
+ Ok(request)
+ }
+}
diff --git a/crates/pseudonyms/src/state.rs b/crates/pseudonyms/src/state.rs
new file mode 100644
index 0000000..349c324
--- /dev/null
+++ b/crates/pseudonyms/src/state.rs
@@ -0,0 +1,56 @@
+mod mutate;
+
+use std::{
+ net::{Ipv6Addr, SocketAddr}, ops::Deref, sync::Arc
+};
+
+use sqlx::PgPool;
+use warden_stack::{cache::RedisManager, tracing::SdkTracerProvider, Configuration};
+
+use crate::AppConfig;
+
+#[derive(Clone)]
+pub struct AppHandle(pub Arc<AppState>);
+
+impl Deref for AppHandle {
+ type Target = Arc<AppState>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+#[derive(Clone)]
+pub struct Services {
+ pub postgres: PgPool,
+ pub cache: RedisManager,
+}
+
+#[derive(Clone)]
+pub struct AppState {
+ pub addr: SocketAddr,
+ pub services: Services,
+ pub config: Configuration,
+ pub app_config: AppConfig,
+ pub tracer_provider: Option<SdkTracerProvider>,
+}
+
+impl AppState {
+ pub fn new(
+ services: Services,
+ config: Configuration,
+ tracer_provider: Option<SdkTracerProvider>,
+ ) -> anyhow::Result<Self> {
+ let listen_address = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port));
+
+ let app_config: AppConfig = serde_json::from_value(config.misc.clone())?;
+
+ Ok(Self {
+ addr: listen_address,
+ services,
+ config,
+ tracer_provider,
+ app_config,
+ })
+ }
+}
diff --git a/crates/pseudonyms/src/state/mutate.rs b/crates/pseudonyms/src/state/mutate.rs
new file mode 100644
index 0000000..b0d2f7a
--- /dev/null
+++ b/crates/pseudonyms/src/state/mutate.rs
@@ -0,0 +1,190 @@
+use warden_stack::{
+ opentelemetry_semantic_conventions::attribute, tracing_opentelemetry::OpenTelemetrySpanExt,
+};
+use time::OffsetDateTime;
+use tonic::{Request, Response, Status};
+use tracing::{Instrument, info_span, instrument};
+use warden_core::{
+ google,
+ pseudonyms::transaction_relationship::{
+ CreatePseudonymRequest, mutate_pseudonym_server::MutatePseudonym,
+ },
+};
+
+use crate::state::AppHandle;
+
+#[tonic::async_trait]
+impl MutatePseudonym for AppHandle {
+ #[instrument(skip(self, request), err(Debug))]
+ async fn create_pseudonym(
+ &self,
+ request: Request<CreatePseudonymRequest>,
+ ) -> Result<Response<google::protobuf::Empty>, Status> {
+ let body = request.into_inner();
+ let transaction_relationship = body
+ .transaction_relationship
+ .ok_or_else(|| tonic::Status::data_loss("transaction_relationship"))?;
+ let mut tx = self
+ .services
+ .postgres
+ .begin()
+ .await
+ .map_err(|_e| tonic::Status::internal("database is not ready"))?;
+
+ let span = info_span!("create.pseudonyms.account");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_QUERY_TEXT, "insert into account");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "account");
+ span.set_attribute(
+ attribute::DB_QUERY_PARAMETER,
+ format!("{}{}", body.creditor_account_id, body.debtor_account_id),
+ );
+
+ sqlx::query!(
+ "insert into account (id)
+ select * from unnest($1::text[])
+ on conflict (id) do nothing",
+ &[
+ body.debtor_account_id.to_string(),
+ body.creditor_account_id.to_string()
+ ]
+ )
+ .execute(&mut *tx)
+ .instrument(span)
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let span = info_span!("create.pseudonyms.entity");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_QUERY_TEXT, "insert into entity");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "entity");
+ span.set_attribute(
+ attribute::DB_QUERY_PARAMETER,
+ format!("{}{}", body.creditor_id, body.debtor_id),
+ );
+ let cre_dt_tm = transaction_relationship.cre_dt_tm.expect("cre_dt_tm");
+ let cre_dt_tm = OffsetDateTime::try_from(cre_dt_tm).expect("offset date time conv");
+
+ sqlx::query!(
+ "insert into entity (id, cre_dt_tm)
+ select * from unnest($1::text[], $2::timestamptz[])
+ on conflict (id)
+ do update set cre_dt_tm = excluded.cre_dt_tm
+ ",
+ &[body.creditor_id.to_string(), body.debtor_id.to_string()],
+ &[cre_dt_tm, cre_dt_tm]
+ )
+ .execute(&mut *tx)
+ .instrument(span)
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let account_holders = &[
+ (
+ body.debtor_id.to_string(),
+ body.creditor_account_id.to_string(),
+ ),
+ (
+ body.creditor_id.to_string(),
+ body.creditor_account_id.to_string(),
+ ),
+ ];
+ let mut deb_holder = vec![];
+ let mut cred_holder = vec![];
+ let mut dts = vec![];
+
+ account_holders.iter().for_each(|todo| {
+ deb_holder.push(todo.0.to_string());
+ cred_holder.push(todo.1.to_string());
+ dts.push(cre_dt_tm);
+ });
+
+ let span = info_span!("create.pseudonyms.account_holder");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_QUERY_TEXT, "insert into account_holder");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "account_holder");
+
+ sqlx::query!(
+ "insert into account_holder (source, destination, cre_dt_tm)
+ select * from unnest($1::text[], $2::text[], $3::timestamptz[])
+ on conflict (source, destination)
+ do update set cre_dt_tm = excluded.cre_dt_tm
+ ",
+ &deb_holder,
+ &cred_holder,
+ &dts
+ )
+ .execute(&mut *tx)
+ .instrument(span)
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let latlng: Option<(f64, f64)> = transaction_relationship
+ .latlng
+ .map(|value| (value.latitude, value.longitude));
+
+ let span = info_span!("create.pseudonyms.transaction_relationship");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "transaction_relationship");
+ span.set_attribute(
+ attribute::DB_QUERY_TEXT,
+ "insert into transaction_relationship",
+ );
+
+ let amt = transaction_relationship
+ .amt
+ .ok_or_else(|| tonic::Status::data_loss("amt"))?;
+
+ sqlx::query!(
+ "
+ insert into transaction_relationship (
+ source,
+ destination,
+ amt_unit,
+ amt_ccy,
+ amt_nanos,
+ cre_dt_tm,
+ end_to_end_id,
+ msg_id,
+ pmt_inf_id,
+ tx_tp,
+ lat,
+ lon,
+ tx_sts
+ )
+ values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
+ ",
+ transaction_relationship.from,
+ transaction_relationship.to,
+ amt.units,
+ amt.currency_code,
+ amt.nanos,
+ cre_dt_tm,
+ transaction_relationship.end_to_end_id,
+ transaction_relationship.msg_id,
+ transaction_relationship.pmt_inf_id,
+ transaction_relationship.tx_tp,
+ latlng.map(|lat| lat.0),
+ latlng.map(|lat| lat.1),
+ transaction_relationship.tx_sts,
+ )
+ .execute(&mut *tx)
+ .instrument(span)
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let span = info_span!("transaction.commit");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "commit");
+
+ tx.commit()
+ .instrument(span)
+ .await
+ .map_err(|_e| tonic::Status::internal("database is not ready"))?;
+ Ok(Response::new(google::protobuf::Empty::default()))
+ }
+}
diff --git a/lib/warden-stack/Cargo.toml b/lib/warden-stack/Cargo.toml
index d7c1eb8..a2e910b 100644
--- a/lib/warden-stack/Cargo.toml
+++ b/lib/warden-stack/Cargo.toml
@@ -58,7 +58,7 @@ opentelemetry = [
]
postgres = ["sqlx/postgres", "url/serde", "secrecy/serde"]
tracing = ["dep:tracing", "tracing-subscriber/env-filter"]
-opentelemetry-tonic = ["dep:tonic"]
+opentelemetry-tonic = ["dep:tonic", "opentelemetry"]
tracing-loki = ["dep:tracing-loki", "tracing"]
[[example]]