aboutsummaryrefslogtreecommitdiffstats
path: root/crates/pseudonyms/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/pseudonyms/src')
-rw-r--r--crates/pseudonyms/src/lib.rs23
-rw-r--r--crates/pseudonyms/src/main.rs70
-rw-r--r--crates/pseudonyms/src/server.rs65
-rw-r--r--crates/pseudonyms/src/server/interceptor.rs23
-rw-r--r--crates/pseudonyms/src/state.rs56
-rw-r--r--crates/pseudonyms/src/state/mutate.rs190
6 files changed, 425 insertions, 2 deletions
diff --git a/crates/pseudonyms/src/lib.rs b/crates/pseudonyms/src/lib.rs
new file mode 100644
index 0000000..9d76245
--- /dev/null
+++ b/crates/pseudonyms/src/lib.rs
@@ -0,0 +1,23 @@
+pub mod server;
+pub mod state;
+
+use std::sync::Arc;
+
+use serde::Deserialize;
+use state::AppHandle;
+use tracing::{debug, trace};
+
+#[derive(Deserialize, Clone)]
+pub struct AppConfig {
+ pub something: Arc<str>,
+}
+
+pub async fn run(state: AppHandle, tx: tokio::sync::oneshot::Sender<u16>) -> anyhow::Result<()> {
+ trace!("running migrations");
+ sqlx::migrate!("./migrations")
+ .run(&state.services.postgres)
+ .await?;
+ debug!("ran migrations");
+
+ server::serve(state, tx).await
+}
diff --git a/crates/pseudonyms/src/main.rs b/crates/pseudonyms/src/main.rs
index e7a11a9..7b67557 100644
--- a/crates/pseudonyms/src/main.rs
+++ b/crates/pseudonyms/src/main.rs
@@ -1,3 +1,69 @@
-fn main() {
- println!("Hello, world!");
+use clap::Parser;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+use std::sync::Arc;
+use tracing::error;
+use warden_pseudonyms::state::{AppHandle, AppState};
+
+/// warden-pseudonyms
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../pseudonyms.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .postgres(&config.database)
+ .await
+ .inspect_err(|e| error!("database: {e}"))?
+ .cache(&config.cache)
+ .await
+ .inspect_err(|e| error!("cache: {e}"))?
+ .build();
+
+ let postgres = services
+ .postgres
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("database is not ready"))?;
+
+ let cache = services
+ .cache
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?;
+
+ let services = warden_pseudonyms::state::Services { postgres , cache};
+
+
+ let state = AppState::new(services, config, Some(provider))?;
+
+ let (tx, _rx) = tokio::sync::oneshot::channel();
+ warden_pseudonyms::run(AppHandle(Arc::new(state)), tx)
+ .await
+ .inspect_err(|e| error!("{e}"))
}
diff --git a/crates/pseudonyms/src/server.rs b/crates/pseudonyms/src/server.rs
new file mode 100644
index 0000000..88c46b4
--- /dev/null
+++ b/crates/pseudonyms/src/server.rs
@@ -0,0 +1,65 @@
+mod interceptor;
+use interceptor::MyInterceptor;
+use tokio::signal;
+use warden_core::pseudonyms::transaction_relationship::mutate_pseudonym_server::MutatePseudonymServer;
+
+use tonic::transport::{Server, server::TcpIncoming};
+use tracing::info;
+
+use crate::state::AppHandle;
+
+pub async fn serve(state: AppHandle, tx: tokio::sync::oneshot::Sender<u16>) -> anyhow::Result<()> {
+ let listener = tokio::net::TcpListener::bind(state.addr).await?;
+
+ let socket_addr = listener
+ .local_addr()
+ .expect("should get socket_addr from listener");
+
+ tx.send(socket_addr.port())
+ .expect("port channel to be open");
+
+ info!(addr = ?socket_addr, "starting server");
+
+ Server::builder()
+ .trace_fn(|_| tracing::info_span!(env!("CARGO_PKG_NAME")))
+ // .add_service(QueryUsersServer::new(state.clone()))
+ .add_service(MutatePseudonymServer::with_interceptor(
+ state.clone(),
+ MyInterceptor,
+ ))
+ .serve_with_incoming_shutdown(TcpIncoming::from(listener), shutdown_signal(state))
+ .await?;
+
+ Ok(())
+}
+async fn shutdown_signal(state: AppHandle) {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ if let Some(ref provider) = state.tracer_provider {
+ let _ = provider.shutdown();
+ }
+ },
+ _ = terminate => {
+ if let Some(ref provider) = state.tracer_provider {
+ let _ = provider.shutdown();
+ }
+ },
+ }
+}
diff --git a/crates/pseudonyms/src/server/interceptor.rs b/crates/pseudonyms/src/server/interceptor.rs
new file mode 100644
index 0000000..f30f140
--- /dev/null
+++ b/crates/pseudonyms/src/server/interceptor.rs
@@ -0,0 +1,23 @@
+use warden_stack::{
+ opentelemetry::global, tracing::telemetry::tonic::extractor,
+ tracing_opentelemetry::OpenTelemetrySpanExt,
+};
+use tonic::{Status, service::Interceptor};
+use tracing::Span;
+
+#[derive(Clone, Copy)]
+pub struct MyInterceptor;
+
+impl Interceptor for MyInterceptor {
+ fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
+ let span = Span::current();
+
+ let cx = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&extractor::MetadataMap(request.metadata()))
+ });
+
+ span.set_parent(cx);
+
+ Ok(request)
+ }
+}
diff --git a/crates/pseudonyms/src/state.rs b/crates/pseudonyms/src/state.rs
new file mode 100644
index 0000000..349c324
--- /dev/null
+++ b/crates/pseudonyms/src/state.rs
@@ -0,0 +1,56 @@
+mod mutate;
+
+use std::{
+ net::{Ipv6Addr, SocketAddr}, ops::Deref, sync::Arc
+};
+
+use sqlx::PgPool;
+use warden_stack::{cache::RedisManager, tracing::SdkTracerProvider, Configuration};
+
+use crate::AppConfig;
+
+#[derive(Clone)]
+pub struct AppHandle(pub Arc<AppState>);
+
+impl Deref for AppHandle {
+ type Target = Arc<AppState>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+#[derive(Clone)]
+pub struct Services {
+ pub postgres: PgPool,
+ pub cache: RedisManager,
+}
+
+#[derive(Clone)]
+pub struct AppState {
+ pub addr: SocketAddr,
+ pub services: Services,
+ pub config: Configuration,
+ pub app_config: AppConfig,
+ pub tracer_provider: Option<SdkTracerProvider>,
+}
+
+impl AppState {
+ pub fn new(
+ services: Services,
+ config: Configuration,
+ tracer_provider: Option<SdkTracerProvider>,
+ ) -> anyhow::Result<Self> {
+ let listen_address = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port));
+
+ let app_config: AppConfig = serde_json::from_value(config.misc.clone())?;
+
+ Ok(Self {
+ addr: listen_address,
+ services,
+ config,
+ tracer_provider,
+ app_config,
+ })
+ }
+}
diff --git a/crates/pseudonyms/src/state/mutate.rs b/crates/pseudonyms/src/state/mutate.rs
new file mode 100644
index 0000000..b0d2f7a
--- /dev/null
+++ b/crates/pseudonyms/src/state/mutate.rs
@@ -0,0 +1,190 @@
+use warden_stack::{
+ opentelemetry_semantic_conventions::attribute, tracing_opentelemetry::OpenTelemetrySpanExt,
+};
+use time::OffsetDateTime;
+use tonic::{Request, Response, Status};
+use tracing::{Instrument, info_span, instrument};
+use warden_core::{
+ google,
+ pseudonyms::transaction_relationship::{
+ CreatePseudonymRequest, mutate_pseudonym_server::MutatePseudonym,
+ },
+};
+
+use crate::state::AppHandle;
+
+#[tonic::async_trait]
+impl MutatePseudonym for AppHandle {
+ #[instrument(skip(self, request), err(Debug))]
+ async fn create_pseudonym(
+ &self,
+ request: Request<CreatePseudonymRequest>,
+ ) -> Result<Response<google::protobuf::Empty>, Status> {
+ let body = request.into_inner();
+ let transaction_relationship = body
+ .transaction_relationship
+ .ok_or_else(|| tonic::Status::data_loss("transaction_relationship"))?;
+ let mut tx = self
+ .services
+ .postgres
+ .begin()
+ .await
+ .map_err(|_e| tonic::Status::internal("database is not ready"))?;
+
+ let span = info_span!("create.pseudonyms.account");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_QUERY_TEXT, "insert into account");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "account");
+ span.set_attribute(
+ attribute::DB_QUERY_PARAMETER,
+ format!("{}{}", body.creditor_account_id, body.debtor_account_id),
+ );
+
+ sqlx::query!(
+ "insert into account (id)
+ select * from unnest($1::text[])
+ on conflict (id) do nothing",
+ &[
+ body.debtor_account_id.to_string(),
+ body.creditor_account_id.to_string()
+ ]
+ )
+ .execute(&mut *tx)
+ .instrument(span)
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let span = info_span!("create.pseudonyms.entity");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_QUERY_TEXT, "insert into entity");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "entity");
+ span.set_attribute(
+ attribute::DB_QUERY_PARAMETER,
+ format!("{}{}", body.creditor_id, body.debtor_id),
+ );
+ let cre_dt_tm = transaction_relationship.cre_dt_tm.expect("cre_dt_tm");
+ let cre_dt_tm = OffsetDateTime::try_from(cre_dt_tm).expect("offset date time conv");
+
+ sqlx::query!(
+ "insert into entity (id, cre_dt_tm)
+ select * from unnest($1::text[], $2::timestamptz[])
+ on conflict (id)
+ do update set cre_dt_tm = excluded.cre_dt_tm
+ ",
+ &[body.creditor_id.to_string(), body.debtor_id.to_string()],
+ &[cre_dt_tm, cre_dt_tm]
+ )
+ .execute(&mut *tx)
+ .instrument(span)
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let account_holders = &[
+ (
+ body.debtor_id.to_string(),
+ body.creditor_account_id.to_string(),
+ ),
+ (
+ body.creditor_id.to_string(),
+ body.creditor_account_id.to_string(),
+ ),
+ ];
+ let mut deb_holder = vec![];
+ let mut cred_holder = vec![];
+ let mut dts = vec![];
+
+ account_holders.iter().for_each(|todo| {
+ deb_holder.push(todo.0.to_string());
+ cred_holder.push(todo.1.to_string());
+ dts.push(cre_dt_tm);
+ });
+
+ let span = info_span!("create.pseudonyms.account_holder");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_QUERY_TEXT, "insert into account_holder");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "account_holder");
+
+ sqlx::query!(
+ "insert into account_holder (source, destination, cre_dt_tm)
+ select * from unnest($1::text[], $2::text[], $3::timestamptz[])
+ on conflict (source, destination)
+ do update set cre_dt_tm = excluded.cre_dt_tm
+ ",
+ &deb_holder,
+ &cred_holder,
+ &dts
+ )
+ .execute(&mut *tx)
+ .instrument(span)
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let latlng: Option<(f64, f64)> = transaction_relationship
+ .latlng
+ .map(|value| (value.latitude, value.longitude));
+
+ let span = info_span!("create.pseudonyms.transaction_relationship");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "transaction_relationship");
+ span.set_attribute(
+ attribute::DB_QUERY_TEXT,
+ "insert into transaction_relationship",
+ );
+
+ let amt = transaction_relationship
+ .amt
+ .ok_or_else(|| tonic::Status::data_loss("amt"))?;
+
+ sqlx::query!(
+ "
+ insert into transaction_relationship (
+ source,
+ destination,
+ amt_unit,
+ amt_ccy,
+ amt_nanos,
+ cre_dt_tm,
+ end_to_end_id,
+ msg_id,
+ pmt_inf_id,
+ tx_tp,
+ lat,
+ lon,
+ tx_sts
+ )
+ values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
+ ",
+ transaction_relationship.from,
+ transaction_relationship.to,
+ amt.units,
+ amt.currency_code,
+ amt.nanos,
+ cre_dt_tm,
+ transaction_relationship.end_to_end_id,
+ transaction_relationship.msg_id,
+ transaction_relationship.pmt_inf_id,
+ transaction_relationship.tx_tp,
+ latlng.map(|lat| lat.0),
+ latlng.map(|lat| lat.1),
+ transaction_relationship.tx_sts,
+ )
+ .execute(&mut *tx)
+ .instrument(span)
+ .await
+ .map_err(|e| tonic::Status::internal(e.to_string()))?;
+
+ let span = info_span!("transaction.commit");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "commit");
+
+ tx.commit()
+ .instrument(span)
+ .await
+ .map_err(|_e| tonic::Status::internal("database is not ready"))?;
+ Ok(Response::new(google::protobuf::Empty::default()))
+ }
+}