aboutsummaryrefslogtreecommitdiffstats
path: root/crates/typologies/src/processor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/typologies/src/processor.rs')
-rw-r--r--crates/typologies/src/processor.rs124
1 files changed, 124 insertions, 0 deletions
diff --git a/crates/typologies/src/processor.rs b/crates/typologies/src/processor.rs
new file mode 100644
index 0000000..a2a3b17
--- /dev/null
+++ b/crates/typologies/src/processor.rs
@@ -0,0 +1,124 @@
+mod driver;
+mod publish;
+mod reload;
+mod typology;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use async_nats::jetstream::{
+ Context,
+ consumer::{Consumer, pull},
+};
+use futures_util::{StreamExt, future};
+use tokio::signal;
+use tracing::{error, trace, warn};
+use warden_stack::{Configuration, tracing::SdkTracerProvider};
+
+use crate::{
+ cnfg::Nats,
+ state::{AppHandle, AppState, Services},
+};
+
+pub async fn serve(
+ services: Services,
+ config: Configuration,
+ provider: SdkTracerProvider,
+) -> anyhow::Result<()> {
+ let state = Arc::new(AppState::new(services, config).await?);
+
+ tokio::select! {
+ _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+
+ Ok(())
+}
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = Arc::clone(&state);
+ tokio::spawn(async move {
+ match message {
+ Ok(message) => {
+ if let Err(e) = typology::process_typology(message, state).await {
+ error!("{e:?}");
+ }
+ }
+ Err(e) => {
+ warn!("{e:?}");
+ }
+ }
+ });
+ future::ready(())
+ })
+ .await;
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &Nats,
+) -> anyhow::Result<Consumer<pull::Config>> {
+ let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
+ .replace(".", "")
+ .replace("_", "");
+ trace!(name = name, subjects = ?nats.subjects, "getting or creating stream");
+
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name,
+ subjects: nats.subjects.iter().map(Into::into).collect(),
+ max_messages: nats.max_messages,
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ Ok(stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?)
+}
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}