aboutsummaryrefslogtreecommitdiffstats
path: root/crates/aggregator
diff options
context:
space:
mode:
Diffstat (limited to 'crates/aggregator')
-rw-r--r--crates/aggregator/.dockerignore5
-rw-r--r--crates/aggregator/Cargo.toml51
-rw-r--r--crates/aggregator/Dockerfile27
-rw-r--r--crates/aggregator/aggregator.toml39
-rw-r--r--crates/aggregator/migrations/20250816113451_evaluation.sql6
-rw-r--r--crates/aggregator/src/cnfg.rs17
-rw-r--r--crates/aggregator/src/main.rs84
-rw-r--r--crates/aggregator/src/processor.rs105
-rw-r--r--crates/aggregator/src/processor/aggregate.rs167
-rw-r--r--crates/aggregator/src/state.rs42
10 files changed, 543 insertions, 0 deletions
diff --git a/crates/aggregator/.dockerignore b/crates/aggregator/.dockerignore
new file mode 100644
index 0000000..c8cd160
--- /dev/null
+++ b/crates/aggregator/.dockerignore
@@ -0,0 +1,5 @@
+/target
+.env
+.git
+.github
+/contrib
diff --git a/crates/aggregator/Cargo.toml b/crates/aggregator/Cargo.toml
new file mode 100644
index 0000000..28e4b01
--- /dev/null
+++ b/crates/aggregator/Cargo.toml
@@ -0,0 +1,51 @@
+[package]
+name = "warden-aggregator"
+version = "0.1.0"
+edition = "2024"
+license.workspace = true
+homepage.workspace = true
+documentation.workspace = true
+description.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+async-nats.workspace = true
+clap = { workspace = true, features = ["derive"] }
+config = { workspace = true, features = ["toml"] }
+futures-util.workspace = true
+opentelemetry.workspace = true
+opentelemetry-semantic-conventions.workspace = true
+prost.workspace = true
+serde = { workspace = true, features = ["derive", "rc"] }
+serde_json.workspace = true
+sqlx = { workspace = true, features = [
+ "json",
+ "macros",
+ "migrate",
+ "postgres",
+ "runtime-tokio",
+ "time",
+ "tls-rustls",
+ "uuid",
+] }
+tokio = { workspace = true, features = [
+ "macros",
+ "rt-multi-thread",
+ "signal",
+] }
+tonic.workspace = true
+tracing.workspace = true
+tracing-opentelemetry.workspace = true
+uuid = { workspace = true, features = ["v7"] }
+warden-core = { workspace = true, features = [
+ "message",
+ "serde",
+ "time",
+] }
+warden-stack = { workspace = true, features = [
+ "cache",
+ "nats-jetstream",
+ "opentelemetry",
+ "postgres",
+ "tracing-loki",
+] }
diff --git a/crates/aggregator/Dockerfile b/crates/aggregator/Dockerfile
new file mode 100644
index 0000000..73560e6
--- /dev/null
+++ b/crates/aggregator/Dockerfile
@@ -0,0 +1,27 @@
+FROM rust:1.89.0-slim AS builder
+
+ENV SQLX_OFFLINE=true
+
+RUN rustup target add x86_64-unknown-linux-musl
+RUN apt update && apt install -y musl-tools musl-dev protobuf-compiler curl
+RUN update-ca-certificates
+
+WORKDIR /usr/src/app
+
+RUN mkdir -p crates
+
+COPY ./.sqlx .sqlx
+COPY ./crates/aggregator crates/aggregator
+COPY ./lib lib
+COPY ./Cargo.toml .
+COPY ./Cargo.lock .
+
+RUN cargo fetch
+
+COPY ./proto proto
+
+RUN cargo build --target x86_64-unknown-linux-musl --release
+
+FROM scratch
+COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/warden-aggregator ./
+CMD [ "./warden-aggregator" ]
diff --git a/crates/aggregator/aggregator.toml b/crates/aggregator/aggregator.toml
new file mode 100644
index 0000000..b9a0695
--- /dev/null
+++ b/crates/aggregator/aggregator.toml
@@ -0,0 +1,39 @@
+[application]
+env = "development"
+
+[monitoring]
+log-level = "warden_aggregator=trace,info"
+opentelemetry-endpoint = "http://localhost:4317"
+loki-endpoint = "http://localhost:3100"
+
+[misc.nats]
+stream-name = "tadp"
+subjects = ["tadp.>"]
+durable-name = "tadp"
+
+[database]
+pool_size = 100
+port = 5432
+name = "evaluations"
+host = "localhost"
+password = "password"
+user = "postgres"
+
+[nats]
+hosts = ["nats://localhost:4222"]
+
+[cache]
+dsn = "redis://localhost:6379"
+pooled = true
+type = "non-clustered" # clustered, non-clustered or sentinel
+max-connections = 100
+
+[cache.sentinel]
+master-name = "mymaster"
+nodes = [
+ { host = "127.0.0.1", port = 26379 },
+ { host = "127.0.0.2", port = 26379 },
+ { host = "127.0.0.3", port = 26379 },
+]
+
+# vim:ft=toml
diff --git a/crates/aggregator/migrations/20250816113451_evaluation.sql b/crates/aggregator/migrations/20250816113451_evaluation.sql
new file mode 100644
index 0000000..c8c0ac7
--- /dev/null
+++ b/crates/aggregator/migrations/20250816113451_evaluation.sql
@@ -0,0 +1,6 @@
+-- Add migration script here
+create table evaluation (
+ id uuid primary key,
+ document jsonb not null,
+ created_at timestamptz default now()
+);
diff --git a/crates/aggregator/src/cnfg.rs b/crates/aggregator/src/cnfg.rs
new file mode 100644
index 0000000..7c7aaf4
--- /dev/null
+++ b/crates/aggregator/src/cnfg.rs
@@ -0,0 +1,17 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+#[derive(Deserialize, Clone)]
+pub struct LocalConfig {
+ pub nats: NatsConfig,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct NatsConfig {
+ #[serde(rename = "stream-name")]
+ pub name: Arc<str>,
+ pub subjects: Arc<[String]>,
+ pub durable_name: Arc<str>,
+}
diff --git a/crates/aggregator/src/main.rs b/crates/aggregator/src/main.rs
new file mode 100644
index 0000000..62af544
--- /dev/null
+++ b/crates/aggregator/src/main.rs
@@ -0,0 +1,84 @@
+mod cnfg;
+mod processor;
+mod state;
+
+use anyhow::Result;
+use clap::Parser;
+use tracing::error;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+
+use crate::state::AppState;
+
+/// warden-aggregator
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../aggregator.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .postgres(&config.database)
+ .await
+ .inspect_err(|e| error!("database: {e}"))?
+ .cache(&config.cache)
+ .await
+ .inspect_err(|e| error!("cache: {e}"))?
+ .nats_jetstream(&config.nats)
+ .await
+ .inspect_err(|e| error!("nats: {e}"))?
+ .build();
+
+ let postgres = services
+ .postgres
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("database is not ready"))?;
+
+ let cache = services
+ .cache
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?;
+
+ let jetstream = services
+ .jetstream
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
+
+ let services = state::Services {
+ postgres,
+ cache,
+ jetstream,
+ };
+
+ let state = AppState::create(services, &config).await?;
+
+ processor::serve(state, provider).await?;
+
+ Ok(())
+}
diff --git a/crates/aggregator/src/processor.rs b/crates/aggregator/src/processor.rs
new file mode 100644
index 0000000..3a7c8ac
--- /dev/null
+++ b/crates/aggregator/src/processor.rs
@@ -0,0 +1,105 @@
+mod aggregate;
+
+use anyhow::Result;
+use async_nats::{
+ self,
+ jetstream::{
+ Context,
+ consumer::{Consumer, pull::Config},
+ },
+};
+use futures_util::{StreamExt as _, future};
+use tokio::signal;
+use tracing::{debug, error, info};
+use warden_stack::tracing::SdkTracerProvider;
+
+use crate::{cnfg::NatsConfig, state::AppHandle};
+
+pub async fn serve(state: AppHandle, provider: SdkTracerProvider) -> Result<()> {
+ tokio::select! {
+ _ = run(state) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+ Ok(())
+}
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = state.clone();
+ tokio::spawn(async move {
+ if let Ok(message) = message
+ && let Err(e) = aggregate::handle(message, state).await
+ {
+ error!("{}", e.to_string());
+ }
+ });
+ future::ready(())
+ })
+ .await;
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &NatsConfig,
+) -> anyhow::Result<Consumer<Config>> {
+ debug!(name = ?nats.name, subjects = ?nats.subjects, "getting or creating stream");
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name: nats.name.to_string(),
+ subjects: nats.subjects.iter().map(|v| v.to_string()).collect(),
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ let consumer = stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ info!(subject = ?nats.subjects, "ready to receive messages");
+ Ok(consumer)
+}
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}
diff --git a/crates/aggregator/src/processor/aggregate.rs b/crates/aggregator/src/processor/aggregate.rs
new file mode 100644
index 0000000..d9d8482
--- /dev/null
+++ b/crates/aggregator/src/processor/aggregate.rs
@@ -0,0 +1,167 @@
+use async_nats::jetstream::Message;
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use uuid::Uuid;
+use warden_core::{
+ configuration::routing::RoutingConfiguration,
+ message::{AggregationResult, Payload, TypologyResult, payload::Transaction},
+};
+use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor};
+
+use crate::state::AppHandle;
+
+#[instrument(skip(message, state), err(Debug))]
+pub async fn handle(message: Message, state: AppHandle) -> anyhow::Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?;
+
+ if let (Some(ref typology_result), Some(Transaction::Pacs002(document)), Some(routing)) = (
+ payload.typology_result.take(),
+ &payload.transaction,
+ &payload.routing,
+ ) {
+ let cache_key = format!("tadp_{}_tp", document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id);
+ let (typology_results, review) =
+ handle_typologies(typology_result, &state, &cache_key, routing).await?;
+
+ if typology_results
+ .len()
+ .ne(&routing.messages[0].typologies.len())
+ {
+ trace!("insufficient typology results for this typology. waiting for more");
+ return Ok(());
+ }
+
+ let aggs = AggregationResult {
+ id: routing.messages[0].id.to_owned(),
+ version: routing.messages[0].version.to_owned(),
+ typology_results,
+ review,
+ };
+
+ payload.aggregation_result = Some(aggs);
+ let _ = payload.rule_result.take();
+
+ let id = Uuid::now_v7();
+ debug!(%id, "inserting evaluation result");
+
+ let span = info_span!("create.evaluations.evaluation");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "transaction");
+ span.set_attribute("otel.kind", "client");
+
+ sqlx::query!(
+ "insert into evaluation (id, document) values ($1, $2)",
+ id,
+ sqlx::types::Json(&payload) as _
+ )
+ .execute(&state.services.postgres)
+ .instrument(span)
+ .await?;
+ info!(%id, "evaluation added");
+
+ let mut cache = state.services.cache.get().await?;
+ let span = Span::current();
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "del");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string());
+ span.set_attribute("otel.kind", "client");
+ debug!("cache cleared");
+
+ cache.del::<_, ()>(&cache_key).await?;
+ } else {
+ error!("payload has insufficient data");
+ }
+
+ let span = info_span!("nats.ack");
+ message
+ .ack()
+ .instrument(span)
+ .await
+ .map_err(|_| anyhow::anyhow!("ack error"))?;
+
+ Ok(())
+}
+
+async fn handle_typologies(
+ payload: &TypologyResult,
+ state: &AppHandle,
+ cache_key: &str,
+ routing: &RoutingConfiguration,
+) -> anyhow::Result<(Vec<TypologyResult>, bool)> {
+ let mut cache = state.services.cache.get().await?;
+ let bytes = prost::Message::encode_to_vec(payload);
+
+ let span = Span::current();
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "sadd+scard");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string());
+ span.set_attribute("otel.kind", "client");
+
+ debug!("saving typology result");
+ let res = warden_stack::redis::pipe()
+ .sadd::<_, _>(cache_key, bytes)
+ .ignore()
+ .scard(cache_key)
+ .query_async::<Vec<usize>>(&mut cache)
+ .instrument(span)
+ .await?;
+
+ let typology_count = res
+ .first()
+ .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?;
+
+ let typologies = &routing.messages[0].typologies;
+
+ if typology_count.lt(&typologies.len()) {
+ return Ok((vec![], false));
+ }
+
+ debug!("getting all typology results");
+ let span = Span::current();
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "smembers");
+ span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string());
+ span.set_attribute("otel.kind", "client");
+ let res = cache
+ .smembers::<_, Vec<Vec<Vec<u8>>>>(cache_key)
+ .instrument(span)
+ .await?;
+
+ let members = res
+ .first()
+ .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?;
+
+ let typologies: Result<Vec<TypologyResult>, _> = members
+ .iter()
+ .map(|value| {
+ <TypologyResult as prost::Message>::decode(value.as_ref()).map_err(anyhow::Error::new)
+ })
+ .collect();
+
+ let typologies = typologies?;
+
+ let mut review = false;
+ for typology in routing.messages[0].typologies.iter() {
+ if let Some(value) = typologies
+ .iter()
+ .find(|value| value.id.eq(&typology.id) && value.version.eq(&typology.version))
+ && value.review
+ {
+ review = true;
+ }
+ }
+
+ Ok((typologies, review))
+}
diff --git a/crates/aggregator/src/state.rs b/crates/aggregator/src/state.rs
new file mode 100644
index 0000000..ac4f574
--- /dev/null
+++ b/crates/aggregator/src/state.rs
@@ -0,0 +1,42 @@
+use sqlx::PgPool;
+use std::{ops::Deref, sync::Arc};
+
+use async_nats::jetstream::Context;
+use warden_stack::{Configuration, cache::RedisManager};
+
+use crate::cnfg::LocalConfig;
+
+#[derive(Clone)]
+pub struct Services {
+ pub jetstream: Context,
+ pub cache: RedisManager,
+ pub postgres: PgPool,
+}
+
+#[derive(Clone)]
+pub struct AppState {
+ pub services: Services,
+ pub config: LocalConfig,
+}
+
+#[derive(Clone)]
+pub struct AppHandle(pub Arc<AppState>);
+
+impl Deref for AppHandle {
+ type Target = Arc<AppState>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl AppState {
+ pub async fn create(
+ services: Services,
+ configuration: &Configuration,
+ ) -> anyhow::Result<AppHandle> {
+ let config = serde_json::from_value(configuration.misc.clone())?;
+
+ Ok(AppHandle(Arc::new(Self { services, config })))
+ }
+}