From 73d7bab8844bb21c7a9143c30800c2d11d411e42 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Sun, 17 Aug 2025 20:02:49 +0200 Subject: feat: typology processor (#8) --- crates/typologies/src/processor/publish.rs | 44 ++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 crates/typologies/src/processor/publish.rs (limited to 'crates/typologies/src/processor/publish.rs') diff --git a/crates/typologies/src/processor/publish.rs b/crates/typologies/src/processor/publish.rs new file mode 100644 index 0000000..b031bf3 --- /dev/null +++ b/crates/typologies/src/processor/publish.rs @@ -0,0 +1,44 @@ +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing::{Instrument, Span, debug, info_span, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::message::Payload; +use warden_stack::tracing::telemetry::nats::injector; + +use crate::state::AppHandle; + +pub(crate) async fn to_tadp( + subject: &str, + state: AppHandle, + payload: Payload, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!("{}.{}", state.config.nats.destination_prefix, subject); + debug!(subject = ?subject, "publishing"); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + span.set_attribute(attribute::MESSAGING_SYSTEM, "nats"); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + Ok(()) +} -- cgit v1.2.3