diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-12 21:05:07 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-12 21:05:07 +0200 |
commit | b6924a50c9ec49e1b2b0d286abbbe608410af87d (patch) | |
tree | 9c5cf583fedfbb585985ac829bbdfdadce1571fe /crates/router/src/processor/publish.rs | |
parent | d75b5fc9c0497f56e6b8602d8ff8991bfaeff18c (diff) | |
download | warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.tar.bz2 warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.zip |
feat(router): get config
Diffstat (limited to 'crates/router/src/processor/publish.rs')
-rw-r--r-- | crates/router/src/processor/publish.rs | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/crates/router/src/processor/publish.rs b/crates/router/src/processor/publish.rs new file mode 100644 index 0000000..16dcec8 --- /dev/null +++ b/crates/router/src/processor/publish.rs @@ -0,0 +1,51 @@ +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{Instrument, Span, debug, info, info_span, warn}; +use warden_core::{configuration::routing::RoutingConfiguration, message::Payload}; +use warden_stack::tracing::telemetry::nats::injector; + +use crate::state::AppHandle; + +pub(crate) async fn to_rule( + (rule_id, rule_version): (&String, &str), + state: AppHandle, + mut payload: Payload, + routing: RoutingConfiguration, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!( + "{}.{rule_id}.v{rule_version}", + state.config.nats.destination_prefix + ); + debug!(subject = ?subject, "publishing"); + + payload.routing = Some(routing); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + info!("published to rule"); + + Ok(()) +} |