aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-11 13:12:39 +0200
committerrtkay123 <dev@kanjala.com>2025-08-11 13:12:48 +0200
commit968f8e837f8e383758d4388a00625982093dd29b (patch)
tree238169b35658cf0a8f6913fc6b7b1ab6f24655f0
parent418b7bcea86d6838b704ee8d3f55501debd5ee21 (diff)
downloadwarden-968f8e837f8e383758d4388a00625982093dd29b.tar.bz2
warden-968f8e837f8e383758d4388a00625982093dd29b.zip
feat(warden): otel span kind
for service graphs in tempo
-rw-r--r--crates/pseudonyms/src/server.rs3
-rw-r--r--crates/pseudonyms/src/state/mutate.rs10
-rw-r--r--crates/warden/Cargo.toml2
-rw-r--r--crates/warden/src/server/publish.rs1
-rw-r--r--crates/warden/src/server/routes.rs11
-rw-r--r--crates/warden/src/server/routes/processor/pacs002.rs20
-rw-r--r--crates/warden/src/server/routes/processor/pacs008.rs12
-rw-r--r--crates/warden/src/state.rs4
-rw-r--r--lib/warden-core/build.rs4
-rw-r--r--lib/warden-core/src/google.rs2
-rw-r--r--lib/warden-core/src/google/parser.rs2
11 files changed, 48 insertions, 23 deletions
diff --git a/crates/pseudonyms/src/server.rs b/crates/pseudonyms/src/server.rs
index 88c46b4..4b33375 100644
--- a/crates/pseudonyms/src/server.rs
+++ b/crates/pseudonyms/src/server.rs
@@ -21,8 +21,7 @@ pub async fn serve(state: AppHandle, tx: tokio::sync::oneshot::Sender<u16>) -> a
info!(addr = ?socket_addr, "starting server");
Server::builder()
- .trace_fn(|_| tracing::info_span!(env!("CARGO_PKG_NAME")))
- // .add_service(QueryUsersServer::new(state.clone()))
+ .trace_fn(|_| tracing::trace_span!(env!("CARGO_PKG_NAME"), "otel.kind" = "server"))
.add_service(MutatePseudonymServer::with_interceptor(
state.clone(),
MyInterceptor,
diff --git a/crates/pseudonyms/src/state/mutate.rs b/crates/pseudonyms/src/state/mutate.rs
index 6a737b5..de331d2 100644
--- a/crates/pseudonyms/src/state/mutate.rs
+++ b/crates/pseudonyms/src/state/mutate.rs
@@ -1,6 +1,6 @@
use time::OffsetDateTime;
use tonic::{Request, Response, Status};
-use tracing::{Instrument, info_span, instrument};
+use tracing::{Instrument, debug, info_span, instrument, trace};
use warden_core::{
google,
pseudonyms::transaction_relationship::{
@@ -21,9 +21,12 @@ impl MutatePseudonym for AppHandle {
request: Request<CreatePseudonymRequest>,
) -> Result<Response<google::protobuf::Empty>, Status> {
let body = request.into_inner();
+ trace!("extracting transaction relationship");
let transaction_relationship = body
.transaction_relationship
.ok_or_else(|| tonic::Status::data_loss("transaction_relationship"))?;
+
+ debug!("starting database transaction");
let mut tx = self
.services
.postgres
@@ -41,6 +44,7 @@ impl MutatePseudonym for AppHandle {
format!("{}{}", body.creditor_account_id, body.debtor_account_id),
);
+ trace!("inserting account");
sqlx::query!(
"insert into account (id)
select * from unnest($1::text[])
@@ -67,6 +71,7 @@ impl MutatePseudonym for AppHandle {
let cre_dt_tm = transaction_relationship.cre_dt_tm.expect("cre_dt_tm");
let cre_dt_tm = OffsetDateTime::try_from(cre_dt_tm).expect("offset date time conv");
+ trace!("inserting entity");
sqlx::query!(
"insert into entity (id, cre_dt_tm)
select * from unnest($1::text[], $2::timestamptz[])
@@ -107,6 +112,7 @@ impl MutatePseudonym for AppHandle {
span.set_attribute(attribute::DB_QUERY_TEXT, "insert into account_holder");
span.set_attribute(attribute::DB_COLLECTION_NAME, "account_holder");
+ trace!("inserting account holders");
sqlx::query!(
"insert into account_holder (source, destination, cre_dt_tm)
select * from unnest($1::text[], $2::text[], $3::timestamptz[])
@@ -126,6 +132,7 @@ impl MutatePseudonym for AppHandle {
.latlng
.map(|value| (value.latitude, value.longitude));
+ trace!("inserting transaction relationship");
let span = info_span!("create.pseudonyms.transaction_relationship");
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
@@ -181,6 +188,7 @@ impl MutatePseudonym for AppHandle {
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "commit");
+ debug!("commiting transaction");
tx.commit()
.instrument(span)
.await
diff --git a/crates/warden/Cargo.toml b/crates/warden/Cargo.toml
index 5a40ee5..033937b 100644
--- a/crates/warden/Cargo.toml
+++ b/crates/warden/Cargo.toml
@@ -30,7 +30,7 @@ sqlx = { workspace = true, features = [
"tls-rustls",
"uuid",
] }
-time.workspace = true
+time = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
tonic.workspace = true
tower-http = { workspace = true, features = [
diff --git a/crates/warden/src/server/publish.rs b/crates/warden/src/server/publish.rs
index b3df0a7..89922d4 100644
--- a/crates/warden/src/server/publish.rs
+++ b/crates/warden/src/server/publish.rs
@@ -22,6 +22,7 @@ pub async fn publish_message(state: &AppHandle, payload: Payload, msg_id: &str)
});
let span = info_span!("nats.publish");
+ span.set_attribute("otel.kind", "producer");
span.set_attribute(
attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
subject.to_string(),
diff --git a/crates/warden/src/server/routes.rs b/crates/warden/src/server/routes.rs
index d3b05c6..fe1dba6 100644
--- a/crates/warden/src/server/routes.rs
+++ b/crates/warden/src/server/routes.rs
@@ -38,7 +38,16 @@ mod tests {
let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
let jetstream = async_nats::jetstream::new(client);
- let state = AppState::create(Services {postgres: pool,cache, jetstream}, &test_config()).await.unwrap();
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
let app = server::router(state);
let response = app
diff --git a/crates/warden/src/server/routes/processor/pacs002.rs b/crates/warden/src/server/routes/processor/pacs002.rs
index 7a15d7a..9237afe 100644
--- a/crates/warden/src/server/routes/processor/pacs002.rs
+++ b/crates/warden/src/server/routes/processor/pacs002.rs
@@ -13,7 +13,18 @@ use warden_core::{
};
use warden_stack::redis::AsyncCommands;
-use crate::{error::AppError, server::{publish::publish_message, routes::{processor::pacs008::{build_data_cache, set_cache}, PACS002_001_12}}, state::AppHandle, version::Version};
+use crate::{
+ error::AppError,
+ server::{
+ publish::publish_message,
+ routes::{
+ PACS002_001_12,
+ processor::pacs008::{build_data_cache, set_cache},
+ },
+ },
+ state::AppHandle,
+ version::Version,
+};
#[derive(Serialize)]
struct Row {
@@ -73,12 +84,7 @@ pub async fn post_pacs002(
let data_cache = match cache {
Ok(Ok(data_cache)) => {
debug!(end_to_end_id = end_to_end_id, "cache hit");
- rebuild_entities(
- end_to_end_id,
- &state,
- Some(data_cache),
- )
- .await?
+ rebuild_entities(end_to_end_id, &state, Some(data_cache)).await?
}
_ => {
debug!(end_to_end_id = end_to_end_id, "cache miss");
diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs
index 64a1029..6924212 100644
--- a/crates/warden/src/server/routes/processor/pacs008.rs
+++ b/crates/warden/src/server/routes/processor/pacs008.rs
@@ -1,5 +1,5 @@
use axum::{extract::State, response::IntoResponse};
-use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace, warn};
+use tracing::{Instrument, Span, debug, error, info, instrument, trace, trace_span, warn};
use uuid::Uuid;
use warden_core::{
google::r#type::Money,
@@ -128,8 +128,11 @@ pub(super) async fn post_pacs008(
trace!("updating pseudonyms");
let pseudonyms_fut = async {
- let span = info_span!("create.pseudonyms.account");
- span.set_attribute(attribute::RPC_SERVICE, "pseudonyms");
+ let span = trace_span!(
+ "create.pseudonyms.account",
+ "otel.kind" = "client",
+ "rpc.service" = "pseudonyms"
+ );
pseudonyms_client
.create_pseudonym(request)
.instrument(span)
@@ -149,8 +152,9 @@ pub(super) async fn post_pacs008(
let id = Uuid::now_v7();
debug!(%id, "inserting transaction into history");
- let span = info_span!("create.transaction_history.pacs008");
+ let span = trace_span!("create.transaction_history.pacs008");
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute("otel.kind", "client");
span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
span.set_attribute(attribute::DB_COLLECTION_NAME, "pacs008");
diff --git a/crates/warden/src/state.rs b/crates/warden/src/state.rs
index eebb56b..628225c 100644
--- a/crates/warden/src/state.rs
+++ b/crates/warden/src/state.rs
@@ -4,7 +4,7 @@ use std::{ops::Deref, sync::Arc};
use tonic::transport::Endpoint;
use tracing::error;
use warden_core::pseudonyms::transaction_relationship::mutate_pseudonym_client::MutatePseudonymClient;
-use warden_stack::{Configuration, Environment, cache::RedisManager};
+use warden_stack::{Configuration, cache::RedisManager};
use crate::{
cnfg::LocalConfig,
@@ -31,7 +31,6 @@ pub struct Services {
}
pub struct AppState {
- pub environment: Environment,
pub mutate_pseudonym_client: MutatePseudonymClient<Intercepted>,
pub services: Services,
pub app_config: LocalConfig,
@@ -53,7 +52,6 @@ impl AppState {
MutatePseudonymClient::with_interceptor(channel, MyInterceptor);
Ok(AppHandle(Arc::new(Self {
- environment: configuration.application.env,
mutate_pseudonym_client,
services,
app_config: local_config,
diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs
index 1f71f35..37c1c68 100644
--- a/lib/warden-core/build.rs
+++ b/lib/warden-core/build.rs
@@ -92,8 +92,8 @@ fn add_serde(config: tonic_prost_build::Builder) -> tonic_prost_build::Builder {
#[cfg(feature = "serde-time")]
let config = config.type_attribute(
".google.protobuf.Timestamp",
- "#[serde(try_from = \"time::OffsetDateTime\")] #[serde(into = \"String\")]",
- );
+ "#[serde(try_from = \"crate::google::parser::dt::DateItem\")] #[serde(into = \"String\")]",
+ ).type_attribute(".google.type.Date", "#[serde(try_from = \"crate::google::parser::dt::DateItem\")] #[serde(into = \"String\")]");
config
}
diff --git a/lib/warden-core/src/google.rs b/lib/warden-core/src/google.rs
index 0e9487d..30accb9 100644
--- a/lib/warden-core/src/google.rs
+++ b/lib/warden-core/src/google.rs
@@ -1,4 +1,4 @@
-mod parser;
+pub(crate) mod parser;
/// Well known types
pub mod protobuf {
diff --git a/lib/warden-core/src/google/parser.rs b/lib/warden-core/src/google/parser.rs
index 7f160a3..c17a9fb 100644
--- a/lib/warden-core/src/google/parser.rs
+++ b/lib/warden-core/src/google/parser.rs
@@ -1,5 +1,5 @@
#[cfg(feature = "time")]
-mod dt;
+pub mod dt;
#[cfg(feature = "pseudonyms")]
mod money;