aboutsummaryrefslogtreecommitdiffstats
path: root/crates/router/src/processor.rs
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-12 21:05:07 +0200
committerrtkay123 <dev@kanjala.com>2025-08-12 21:05:07 +0200
commitb6924a50c9ec49e1b2b0d286abbbe608410af87d (patch)
tree9c5cf583fedfbb585985ac829bbdfdadce1571fe /crates/router/src/processor.rs
parentd75b5fc9c0497f56e6b8602d8ff8991bfaeff18c (diff)
downloadwarden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.tar.bz2
warden-b6924a50c9ec49e1b2b0d286abbbe608410af87d.zip
feat(router): get config
Diffstat (limited to 'crates/router/src/processor.rs')
-rw-r--r--crates/router/src/processor.rs108
1 files changed, 108 insertions, 0 deletions
diff --git a/crates/router/src/processor.rs b/crates/router/src/processor.rs
new file mode 100644
index 0000000..b8c69f3
--- /dev/null
+++ b/crates/router/src/processor.rs
@@ -0,0 +1,108 @@
+pub mod grpc;
+mod reload;
+mod route;
+mod publish;
+mod load;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use async_nats::jetstream::{consumer::{pull, Consumer}, Context};
+use tokio::signal;
+use tracing::{error, trace};
+use warden_stack::{Configuration, tracing::SdkTracerProvider};
+use futures_util::StreamExt;
+
+use crate::{cnfg::Nats, state::{AppHandle, AppState, Services}};
+
+
+pub async fn serve(
+ services: Services,
+ config: Configuration,
+ provider: SdkTracerProvider,
+) -> anyhow::Result<()> {
+ let state = Arc::new(AppState::new(services, config).await?);
+
+ tokio::select! {
+ _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+
+ Ok(())
+}
+
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let config = Arc::clone(&state);
+ let (consumer, _) = tokio::join!(
+ get_or_create_stream(&state.services.jetstream, &state.config.nats),
+ load::get_routing_config(Arc::clone(&config))
+ );
+
+ let consumer = consumer?;
+
+ // Consume messages from the consumer
+ while let Some(Ok(message)) = consumer.messages().await?.next().await {
+ if let Err(e) = route::route(message, Arc::clone(&state)).await {
+ error!("{}", e.to_string());
+ }
+ }
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &Nats,
+) -> anyhow::Result<Consumer<pull::Config>> {
+ trace!(name = ?nats.name, "getting or creating stream");
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name: nats.name.to_string(),
+ subjects: nats.subjects.iter().map(Into::into).collect(),
+ max_messages: nats.max_messages,
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ Ok(stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?)
+}
+
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}