aboutsummaryrefslogtreecommitdiffstats
path: root/crates/router/src/processor/publish.rs
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-12 21:05:07 +0200
committerrtkay123 <dev@kanjala.com>2025-08-12 21:05:07 +0200
commitb6924a50c9ec49e1b2b0d286abbbe608410af87d (patch)
tree9c5cf583fedfbb585985ac829bbdfdadce1571fe /crates/router/src/processor/publish.rs
parentd75b5fc9c0497f56e6b8602d8ff8991bfaeff18c (diff)
downloadwarden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.tar.bz2
warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.zip
feat(router): get config
Diffstat (limited to 'crates/router/src/processor/publish.rs')
-rw-r--r--crates/router/src/processor/publish.rs51
1 files changed, 51 insertions, 0 deletions
diff --git a/crates/router/src/processor/publish.rs b/crates/router/src/processor/publish.rs
new file mode 100644
index 0000000..16dcec8
--- /dev/null
+++ b/crates/router/src/processor/publish.rs
@@ -0,0 +1,51 @@
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use tracing::{Instrument, Span, debug, info, info_span, warn};
+use warden_core::{configuration::routing::RoutingConfiguration, message::Payload};
+use warden_stack::tracing::telemetry::nats::injector;
+
+use crate::state::AppHandle;
+
+pub(crate) async fn to_rule(
+ (rule_id, rule_version): (&String, &str),
+ state: AppHandle,
+ mut payload: Payload,
+ routing: RoutingConfiguration,
+) -> anyhow::Result<()> {
+ // send transaction to next with nats
+ let subject = format!(
+ "{}.{rule_id}.v{rule_version}",
+ state.config.nats.destination_prefix
+ );
+ debug!(subject = ?subject, "publishing");
+
+ payload.routing = Some(routing);
+
+ let payload = prost::Message::encode_to_vec(&payload);
+
+ let mut headers = async_nats::HeaderMap::new();
+
+ let cx = Span::current().context();
+
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers))
+ });
+
+ let span = info_span!("nats.publish");
+ span.set_attribute(
+ attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
+ subject.to_string(),
+ );
+ state
+ .services
+ .jetstream
+ .publish_with_headers(subject.clone(), headers, payload.into())
+ .instrument(span)
+ .await
+ .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?;
+
+ info!("published to rule");
+
+ Ok(())
+}