aboutsummaryrefslogtreecommitdiffstats
path: root/crates/typologies/src
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-17 20:02:49 +0200
committerGitHub <noreply@github.com>2025-08-17 20:02:49 +0200
commit73d7bab8844bb21c7a9143c30800c2d11d411e42 (patch)
tree955290bd2bded56b534738d6320216fbeeb708cb /crates/typologies/src
parent725739985d853b07d73fa7fcd6db1f2f1b0000b6 (diff)
downloadwarden-73d7bab8844bb21c7a9143c30800c2d11d411e42.tar.bz2
warden-73d7bab8844bb21c7a9143c30800c2d11d411e42.zip
feat: typology processor (#8)
Diffstat (limited to 'crates/typologies/src')
-rw-r--r--crates/typologies/src/cnfg.rs27
-rw-r--r--crates/typologies/src/main.rs66
-rw-r--r--crates/typologies/src/processor.rs124
-rw-r--r--crates/typologies/src/processor/driver.rs40
-rw-r--r--crates/typologies/src/processor/publish.rs44
-rw-r--r--crates/typologies/src/processor/reload.rs71
-rw-r--r--crates/typologies/src/processor/typology.rs180
-rw-r--r--crates/typologies/src/processor/typology/aggregate_rules.rs202
-rw-r--r--crates/typologies/src/processor/typology/evaluate_expression.rs171
-rw-r--r--crates/typologies/src/state.rs55
10 files changed, 978 insertions, 2 deletions
diff --git a/crates/typologies/src/cnfg.rs b/crates/typologies/src/cnfg.rs
new file mode 100644
index 0000000..6086f46
--- /dev/null
+++ b/crates/typologies/src/cnfg.rs
@@ -0,0 +1,27 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct LocalConfig {
+ pub config_endpoint: Arc<str>,
+ pub nats: Nats,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct Nats {
+ pub subjects: Arc<[String]>,
+ pub destination_prefix: Arc<str>,
+ pub max_messages: i64,
+ pub durable_name: Arc<str>,
+ pub config: ConfigNats,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct ConfigNats {
+ pub stream: Arc<str>,
+ pub reload_subject: Arc<str>,
+}
diff --git a/crates/typologies/src/main.rs b/crates/typologies/src/main.rs
index e7a11a9..ea7843a 100644
--- a/crates/typologies/src/main.rs
+++ b/crates/typologies/src/main.rs
@@ -1,3 +1,65 @@
-fn main() {
- println!("Hello, world!");
+mod cnfg;
+mod processor;
+mod state;
+
+use anyhow::Result;
+use clap::Parser;
+use tracing::error;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+
+/// typologies
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../typologies.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .nats_jetstream(&config.nats)
+ .await
+ .inspect_err(|e| error!("nats: {e}"))?
+ .build();
+
+ let jetstream = services
+ .jetstream
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
+
+ let cache = services
+ .cache
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?;
+
+ let services = state::Services { jetstream, cache };
+
+ processor::serve(services, config, provider)
+ .await
+ .inspect_err(|e| error!("{e}"))
}
diff --git a/crates/typologies/src/processor.rs b/crates/typologies/src/processor.rs
new file mode 100644
index 0000000..a2a3b17
--- /dev/null
+++ b/crates/typologies/src/processor.rs
@@ -0,0 +1,124 @@
+mod driver;
+mod publish;
+mod reload;
+mod typology;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use async_nats::jetstream::{
+ Context,
+ consumer::{Consumer, pull},
+};
+use futures_util::{StreamExt, future};
+use tokio::signal;
+use tracing::{error, trace, warn};
+use warden_stack::{Configuration, tracing::SdkTracerProvider};
+
+use crate::{
+ cnfg::Nats,
+ state::{AppHandle, AppState, Services},
+};
+
+pub async fn serve(
+ services: Services,
+ config: Configuration,
+ provider: SdkTracerProvider,
+) -> anyhow::Result<()> {
+ let state = Arc::new(AppState::new(services, config).await?);
+
+ tokio::select! {
+ _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+
+ Ok(())
+}
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = Arc::clone(&state);
+ tokio::spawn(async move {
+ match message {
+ Ok(message) => {
+ if let Err(e) = typology::process_typology(message, state).await {
+ error!("{e:?}");
+ }
+ }
+ Err(e) => {
+ warn!("{e:?}");
+ }
+ }
+ });
+ future::ready(())
+ })
+ .await;
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &Nats,
+) -> anyhow::Result<Consumer<pull::Config>> {
+ let name = format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
+ .replace(".", "")
+ .replace("_", "");
+ trace!(name = name, subjects = ?nats.subjects, "getting or creating stream");
+
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name,
+ subjects: nats.subjects.iter().map(Into::into).collect(),
+ max_messages: nats.max_messages,
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ Ok(stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?)
+}
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}
diff --git a/crates/typologies/src/processor/driver.rs b/crates/typologies/src/processor/driver.rs
new file mode 100644
index 0000000..d150620
--- /dev/null
+++ b/crates/typologies/src/processor/driver.rs
@@ -0,0 +1,40 @@
+use tonic::IntoRequest;
+use warden_core::configuration::typology::{TypologyConfiguration, TypologyConfigurationRequest};
+
+use crate::state::AppHandle;
+
+pub trait GetTypologyConfiguration {
+ fn get_typology_config(
+ &self,
+ typology_key: TypologyConfigurationRequest,
+ ) -> impl std::future::Future<Output = anyhow::Result<TypologyConfiguration>> + Send;
+}
+
+impl GetTypologyConfiguration for AppHandle {
+ async fn get_typology_config(
+ &self,
+ typology_key: TypologyConfigurationRequest,
+ ) -> anyhow::Result<TypologyConfiguration> {
+ {
+ let local_cache = self.local_cache.read().await;
+ if let Some(result) = local_cache.get(&typology_key).await.map(Ok) {
+ return result;
+ }
+ }
+
+ let local_cache = self.local_cache.write().await;
+ let mut client = self.query_typology_client.clone();
+
+ let value = client
+ .get_typology_configuration(typology_key.clone().into_request())
+ .await?
+ .into_inner()
+ .configuration
+ .ok_or_else(|| anyhow::anyhow!("configuration unavailable"))?;
+ local_cache
+ .insert(typology_key.clone(), value.clone())
+ .await;
+
+ Ok(value)
+ }
+}
diff --git a/crates/typologies/src/processor/publish.rs b/crates/typologies/src/processor/publish.rs
new file mode 100644
index 0000000..b031bf3
--- /dev/null
+++ b/crates/typologies/src/processor/publish.rs
@@ -0,0 +1,44 @@
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing::{Instrument, Span, debug, info_span, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::message::Payload;
+use warden_stack::tracing::telemetry::nats::injector;
+
+use crate::state::AppHandle;
+
+pub(crate) async fn to_tadp(
+ subject: &str,
+ state: AppHandle,
+ payload: Payload,
+) -> anyhow::Result<()> {
+ // send transaction to next with nats
+ let subject = format!("{}.{}", state.config.nats.destination_prefix, subject);
+ debug!(subject = ?subject, "publishing");
+
+ let payload = prost::Message::encode_to_vec(&payload);
+
+ let mut headers = async_nats::HeaderMap::new();
+
+ let cx = Span::current().context();
+
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers))
+ });
+
+ let span = info_span!("nats.publish");
+ span.set_attribute(
+ attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
+ subject.to_string(),
+ );
+ span.set_attribute(attribute::MESSAGING_SYSTEM, "nats");
+ state
+ .services
+ .jetstream
+ .publish_with_headers(subject.clone(), headers, payload.into())
+ .instrument(span)
+ .await
+ .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?;
+
+ Ok(())
+}
diff --git a/crates/typologies/src/processor/reload.rs b/crates/typologies/src/processor/reload.rs
new file mode 100644
index 0000000..fac4c40
--- /dev/null
+++ b/crates/typologies/src/processor/reload.rs
@@ -0,0 +1,71 @@
+use async_nats::jetstream::consumer;
+use futures_util::StreamExt;
+use prost::Message as _;
+use tracing::{error, info, trace};
+use uuid::Uuid;
+use warden_core::configuration::{ConfigKind, ReloadEvent, typology::TypologyConfigurationRequest};
+
+use crate::state::AppHandle;
+
+pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
+ let id = Uuid::now_v7().to_string();
+ info!(durable = id, "listening for configuration changes");
+
+ let durable = &id;
+ let consumer = state
+ .services
+ .jetstream
+ .get_stream(state.config.nats.config.stream.to_string())
+ .await?
+ .get_or_create_consumer(
+ durable,
+ consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ filter_subject: state.config.nats.config.reload_subject.to_string(),
+ deliver_policy: consumer::DeliverPolicy::LastPerSubject,
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ let mut messages = consumer.messages().await?;
+ while let Some(value) = messages.next().await {
+ match value {
+ Ok(message) => {
+ trace!("got reload cache event");
+ if let Ok(res) = ReloadEvent::decode(message.payload.as_ref())
+ && let Ok(kind) = ConfigKind::try_from(res.kind)
+ {
+ match kind {
+ ConfigKind::Typology => {
+ let local_cache = state.local_cache.write().await;
+ let id = res.id();
+ let version = res.version();
+ trace!(
+ id = id,
+ ver = version,
+ "update triggered, invalidating typology config"
+ );
+ let key = TypologyConfigurationRequest {
+ id: id.to_string(),
+ version: version.to_string(),
+ };
+
+ local_cache.invalidate(&key).await;
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ _ => {
+ trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging");
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("{e:?}")
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/crates/typologies/src/processor/typology.rs b/crates/typologies/src/processor/typology.rs
new file mode 100644
index 0000000..b1b2592
--- /dev/null
+++ b/crates/typologies/src/processor/typology.rs
@@ -0,0 +1,180 @@
+mod aggregate_rules;
+mod evaluate_expression;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use opentelemetry::global;
+use prost::Message;
+use tracing::{Instrument, Span, error, info, info_span, instrument, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::{
+ configuration::{routing::RoutingConfiguration, typology::TypologyConfigurationRequest},
+ message::{Payload, RuleResult, TypologyResult},
+};
+use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor};
+
+use crate::{
+ processor::{driver::GetTypologyConfiguration as _, publish},
+ state::AppHandle,
+};
+
+#[instrument(skip(message, state), err(Debug))]
+pub async fn process_typology(
+ message: async_nats::jetstream::Message,
+ state: AppHandle,
+) -> Result<()> {
+ let span = Span::current();
+
+ if let Some(ref headers) = message.headers {
+ let context = global::get_text_map_propagator(|propagator| {
+ propagator.extract(&extractor::HeaderMap(headers))
+ });
+ span.set_parent(context);
+ };
+
+ let payload: Payload = Message::decode(message.payload.as_ref())?;
+
+ if payload.transaction.is_none() {
+ warn!("transaction is empty - proceeding with ack");
+ let _ = message.ack().await;
+ return Ok(());
+ }
+
+ let transaction = payload.transaction.as_ref().expect("to have returned");
+
+ match transaction {
+ warden_core::message::payload::Transaction::Pacs008(_) => {
+ warn!("Pacs008 is unsupported on this version: this should be unreachable");
+ }
+ warden_core::message::payload::Transaction::Pacs002(pacs002_document) => {
+ let key = format!(
+ "tp_{}",
+ pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id
+ );
+
+ let rule_result = &payload
+ .rule_result
+ .as_ref()
+ .expect("rule result should be here");
+ let rule_results = cache_and_get_all(&key, rule_result, Arc::clone(&state)).await?;
+
+ let routing = payload
+ .routing
+ .as_ref()
+ .expect("routing missing from payload");
+
+ let (mut typology_result, _rule_count) =
+ aggregate_rules::aggregate_rules(&rule_results, routing, rule_result)?;
+
+ let _ = evaluate_typology(&mut typology_result, routing, payload.clone(), &key, state)
+ .await
+ .inspect_err(|e| error!("{e}"));
+ }
+ };
+
+ let span = info_span!("nats.ack");
+ message
+ .ack()
+ .instrument(span)
+ .await
+ .map_err(|_| anyhow::anyhow!("ack error"))?;
+
+ Ok(())
+}
+
+#[instrument(skip(typology_result, routing, payload, state), err(Debug))]
+async fn evaluate_typology(
+ typology_result: &mut [TypologyResult],
+ routing: &RoutingConfiguration,
+ mut payload: Payload,
+ key: &str,
+ state: AppHandle,
+) -> Result<()> {
+ for typology_result in typology_result.iter_mut() {
+ let handle = Arc::clone(&state);
+ let routing_rules = routing.messages[0].typologies.iter().find(|typology| {
+ typology.version.eq(&typology_result.version) && typology.id.eq(&typology_result.id)
+ });
+ let typology_result_rules = &typology_result.rule_results;
+
+ if routing_rules.is_some()
+ && typology_result_rules.len() < routing_rules.unwrap().rules.len()
+ {
+ continue;
+ }
+
+ let typology_config = handle
+ .get_typology_config(TypologyConfigurationRequest {
+ id: typology_result.id.to_owned(),
+ version: typology_result.version.to_owned(),
+ })
+ .await?;
+
+ let result = evaluate_expression::evaluate_expression(typology_result, &typology_config)?;
+
+ typology_result.result = result;
+
+ let workflow = typology_config
+ .workflow
+ .as_ref()
+ .expect("no workflow in config");
+
+ if workflow.interdiction_threshold.is_some() {
+ typology_result.workflow.replace(*workflow);
+ }
+ typology_result.review = result.ge(&typology_config.workflow.unwrap().alert_threshold);
+
+ payload.typology_result = Some(typology_result.to_owned());
+
+ let is_interdicting = typology_config
+ .workflow
+ .unwrap()
+ .interdiction_threshold
+ .is_some_and(|value| value > 0.0 && result >= value);
+
+ if is_interdicting {
+ typology_result.review = true;
+ }
+
+ if result >= typology_config.workflow.unwrap().alert_threshold {
+ info!("alerting");
+ }
+
+ let subj = handle.config.nats.destination_prefix.to_string();
+ let _ = publish::to_tadp(&subj, handle, payload.clone())
+ .await
+ .inspect_err(|e| error!("{e}"));
+
+ let mut c = state.services.cache.get().await?;
+ c.del::<_, ()>(key).await?;
+ }
+
+ Ok(())
+}
+
+async fn cache_and_get_all(
+ cache_key: &str,
+ rule_result: &RuleResult,
+ state: AppHandle,
+) -> Result<Vec<RuleResult>> {
+ let mut cache = state.services.cache.get().await?;
+
+ let bytes = prost::Message::encode_to_vec(rule_result);
+
+ let res = warden_stack::redis::pipe()
+ .sadd::<_, _>(cache_key, bytes)
+ .ignore()
+ .smembers(cache_key)
+ .query_async::<Vec<Vec<Vec<u8>>>>(&mut cache)
+ .await?;
+
+ let members = res
+ .first()
+ .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?;
+
+ members
+ .iter()
+ .map(|value| RuleResult::decode(value.as_ref()).map_err(anyhow::Error::new))
+ .collect()
+}
diff --git a/crates/typologies/src/processor/typology/aggregate_rules.rs b/crates/typologies/src/processor/typology/aggregate_rules.rs
new file mode 100644
index 0000000..8f92dae
--- /dev/null
+++ b/crates/typologies/src/processor/typology/aggregate_rules.rs
@@ -0,0 +1,202 @@
+use anyhow::Result;
+use std::collections::HashSet;
+
+use warden_core::{
+ configuration::routing::RoutingConfiguration,
+ message::{RuleResult, TypologyResult},
+};
+
+pub(super) fn aggregate_rules(
+ rule_results: &[RuleResult],
+ routing: &RoutingConfiguration,
+ rule_result: &RuleResult,
+) -> Result<(Vec<TypologyResult>, usize)> {
+ let mut typology_result: Vec<TypologyResult> = vec![];
+ let mut all_rules_set = HashSet::new();
+
+ routing.messages.iter().for_each(|message| {
+ message.typologies.iter().for_each(|typology| {
+ let mut set = HashSet::new();
+
+ for rule in typology.rules.iter() {
+ set.insert((&rule.id, rule.version()));
+ all_rules_set.insert((&rule.id, rule.version()));
+ }
+
+ if !set.contains(&(&rule_result.id, rule_result.version.as_str())) {
+ return;
+ }
+
+ let rule_results: Vec<_> = rule_results
+ .iter()
+ .filter_map(|value| {
+ if set.contains(&(&value.id, &value.version)) {
+ Some(value.to_owned())
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ if !rule_results.is_empty() {
+ typology_result.push(TypologyResult {
+ id: typology.id.to_owned(),
+ version: typology.version.to_owned(),
+ rule_results,
+ ..Default::default()
+ });
+ }
+ });
+ });
+
+ Ok((typology_result, all_rules_set.len()))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use warden_core::{
+ configuration::routing::{Message, RoutingConfiguration, Rule, Typology},
+ message::RuleResult,
+ };
+
+ fn create_rule(id: &str, version: &str) -> Rule {
+ Rule {
+ id: id.to_string(),
+ version: Some(version.to_string()),
+ }
+ }
+
+ fn create_rule_result(id: &str, version: &str) -> RuleResult {
+ RuleResult {
+ id: id.to_string(),
+ version: version.to_string(),
+ ..Default::default()
+ }
+ }
+
+ #[test]
+ fn returns_empty_when_no_matching_typology() {
+ let routing = RoutingConfiguration {
+ messages: vec![Message {
+ typologies: vec![Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1")],
+ }],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let rule_results = vec![create_rule_result("R2", "v1")];
+ let input_rule = create_rule_result("R2", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+ assert!(result.is_empty());
+ assert_eq!(count, 1); // one rule in routing
+ }
+
+ #[test]
+ fn returns_typology_with_matching_rule() {
+ let routing = RoutingConfiguration {
+ messages: vec![Message {
+ typologies: vec![Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")],
+ }],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let rule_results = vec![
+ create_rule_result("R1", "v1"),
+ create_rule_result("R2", "v1"),
+ ];
+
+ let input_rule = create_rule_result("R1", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+
+ assert_eq!(count, 2); // R1, R2
+ assert_eq!(result.len(), 1);
+ assert_eq!(result[0].id, "T1");
+ assert_eq!(result[0].rule_results.len(), 2);
+ }
+
+ #[test]
+ fn ignores_unrelated_rules_in_rule_results() {
+ let routing = RoutingConfiguration {
+ messages: vec![Message {
+ typologies: vec![Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1")],
+ }],
+ ..Default::default()
+ }],
+ ..Default::default()
+ };
+
+ let rule_results = vec![
+ create_rule_result("R1", "v1"),
+ create_rule_result("R99", "v1"), // unrelated
+ ];
+
+ let input_rule = create_rule_result("R1", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+
+ assert_eq!(count, 1);
+ assert_eq!(result.len(), 1);
+ assert_eq!(result[0].rule_results.len(), 1);
+ assert_eq!(result[0].rule_results[0].id, "R1");
+ }
+
+ #[test]
+ fn handles_multiple_messages_and_typologies() {
+ let routing = RoutingConfiguration {
+ messages: vec![
+ Message {
+ typologies: vec![
+ Typology {
+ id: "T1".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1")],
+ },
+ Typology {
+ id: "T2".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R2", "v1")],
+ },
+ ],
+ ..Default::default()
+ },
+ Message {
+ typologies: vec![Typology {
+ id: "T3".to_string(),
+ version: "v1".to_string(),
+ rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")],
+ }],
+ ..Default::default()
+ },
+ ],
+ ..Default::default()
+ };
+
+ let rule_results = vec![
+ create_rule_result("R1", "v1"),
+ create_rule_result("R2", "v1"),
+ ];
+ let input_rule = create_rule_result("R1", "v1");
+
+ let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap();
+
+ assert_eq!(count, 2); // R1, R2 appear in multiple typologies, but unique rules are 2
+ assert_eq!(result.len(), 2); // T1 (R1) and T3 (R1 & R2)
+ assert_eq!(result[0].id, "T1");
+ assert_eq!(result[1].id, "T3");
+ }
+}
diff --git a/crates/typologies/src/processor/typology/evaluate_expression.rs b/crates/typologies/src/processor/typology/evaluate_expression.rs
new file mode 100644
index 0000000..844011e
--- /dev/null
+++ b/crates/typologies/src/processor/typology/evaluate_expression.rs
@@ -0,0 +1,171 @@
+use anyhow::Result;
+use tracing::warn;
+use warden_core::{configuration::typology::TypologyConfiguration, message::TypologyResult};
+
+pub(super) fn evaluate_expression(
+ typology_result: &mut TypologyResult,
+ typology_config: &TypologyConfiguration,
+) -> Result<f64> {
+ let mut to_return = 0.0;
+ let expression = typology_config
+ .expression
+ .as_ref()
+ .expect("expression is missing");
+
+ let rule_values = &typology_config.rules;
+
+ for rule in expression.terms.iter() {
+ let rule_result = typology_result
+ .rule_results
+ .iter()
+ .find(|value| value.id.eq(&rule.id) && value.version.eq(&rule.version));
+
+ if rule_result.is_none() {
+ warn!(term = ?rule, "could not find rule result for typology term");
+ return Ok(Default::default());
+ }
+
+ let rule_result = rule_result.expect("checked and is some");
+
+ let weight = rule_values
+ .iter()
+ .filter_map(|rv| {
+ if !(rv.id.eq(&rule_result.id) && rv.version.eq(&rule_result.version)) {
+ None
+ } else {
+ rv.wghts.iter().find_map(|value| {
+ match value.r#ref.eq(&rule_result.sub_rule_ref) {
+ true => Some(value.wght),
+ false => None,
+ }
+ })
+ }
+ })
+ .next();
+
+ if weight.is_none() {
+ warn!(rule = ?rule, "could not find a weight for the matching rule");
+ }
+ let weight = weight.unwrap_or_default();
+
+ to_return = match expression.operator() {
+ warden_core::configuration::typology::Operator::Add => to_return + weight,
+ warden_core::configuration::typology::Operator::Multiply => to_return * weight,
+ warden_core::configuration::typology::Operator::Subtract => to_return - weight,
+ warden_core::configuration::typology::Operator::Divide => {
+ if weight.ne(&0.0) {
+ to_return / weight
+ } else {
+ to_return
+ }
+ }
+ };
+ }
+ Ok(to_return)
+}
+
+#[cfg(test)]
+mod tests {
+ use warden_core::{
+ configuration::typology::{Expression, Operator, Term, TypologyRule, TypologyRuleWeight},
+ message::RuleResult,
+ };
+
+ use super::*;
+
+ fn make_rule_result(id: &str, version: &str, sub_ref: &str) -> RuleResult {
+ RuleResult {
+ id: id.to_string(),
+ version: version.to_string(),
+ sub_rule_ref: sub_ref.to_string(),
+ ..Default::default()
+ }
+ }
+
+ fn make_rule_value(id: &str, version: &str, ref_name: &str, weight: f64) -> TypologyRule {
+ TypologyRule {
+ id: id.to_string(),
+ version: version.to_string(),
+ wghts: vec![TypologyRuleWeight {
+ r#ref: ref_name.to_string(),
+ wght: weight,
+ }],
+ }
+ }
+
+ fn make_expression(terms: Vec<(&str, &str)>, op: Operator) -> Expression {
+ Expression {
+ terms: terms
+ .into_iter()
+ .map(|(id, version)| Term {
+ id: id.to_string(),
+ version: version.to_string(),
+ })
+ .collect(),
+ operator: op.into(),
+ }
+ }
+
+ #[test]
+ fn test_add_operator_multiple_terms() {
+ let mut typology_result = TypologyResult {
+ rule_results: vec![
+ make_rule_result("R1", "v1", "sub1"),
+ make_rule_result("R2", "v1", "sub2"),
+ ],
+ ..Default::default()
+ };
+
+ let config = TypologyConfiguration {
+ expression: Some(make_expression(
+ vec![("R1", "v1"), ("R2", "v1")],
+ Operator::Add,
+ )),
+ rules: vec![
+ make_rule_value("R1", "v1", "sub1", 10.0),
+ make_rule_value("R2", "v1", "sub2", 5.0),
+ ],
+ ..Default::default()
+ };
+
+ let result = evaluate_expression(&mut typology_result, &config).unwrap();
+ assert_eq!(result, 15.0);
+ }
+
+ #[test]
+ fn test_missing_rule_result_returns_zero() {
+ let mut typology_result = TypologyResult {
+ rule_results: vec![make_rule_result("R1", "v1", "sub1")],
+ ..Default::default()
+ };
+
+ let config = TypologyConfiguration {
+ expression: Some(make_expression(
+ vec![("R1", "v1"), ("R2", "v1")],
+ Operator::Add,
+ )),
+ rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)],
+ ..Default::default()
+ };
+
+ let result = evaluate_expression(&mut typology_result, &config).unwrap();
+ assert_eq!(result, 0.0);
+ }
+
+ #[test]
+ fn test_missing_weight_defaults_to_zero() {
+ let mut typology_result = TypologyResult {
+ rule_results: vec![make_rule_result("R1", "v1", "subX")], // sub_ref doesn't match
+ ..Default::default()
+ };
+
+ let config = TypologyConfiguration {
+ expression: Some(make_expression(vec![("R1", "v1")], Operator::Add)),
+ rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)], // different ref
+ ..Default::default()
+ };
+
+ let result = evaluate_expression(&mut typology_result, &config).unwrap();
+ assert_eq!(result, 0.0);
+ }
+}
diff --git a/crates/typologies/src/state.rs b/crates/typologies/src/state.rs
new file mode 100644
index 0000000..23e08d9
--- /dev/null
+++ b/crates/typologies/src/state.rs
@@ -0,0 +1,55 @@
+use std::sync::Arc;
+
+use async_nats::jetstream::Context;
+use moka::future::Cache;
+use tokio::sync::RwLock;
+use tonic::transport::Endpoint;
+use tracing::error;
+use warden_core::configuration::typology::{
+ TypologyConfiguration, TypologyConfigurationRequest,
+ query_typologies_client::QueryTypologiesClient,
+};
+use warden_stack::{Configuration, cache::RedisManager};
+
+use crate::cnfg::LocalConfig;
+use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor};
+
+#[derive(Clone)]
+pub struct Services {
+ pub jetstream: Context,
+ pub cache: RedisManager,
+}
+
+pub type AppHandle = Arc<AppState>;
+
+#[derive(Clone)]
+pub struct AppState {
+ pub services: Services,
+ pub local_cache: Arc<RwLock<Cache<TypologyConfigurationRequest, TypologyConfiguration>>>,
+ pub config: LocalConfig,
+ pub query_typology_client: QueryTypologiesClient<Intercepted>,
+}
+
+impl AppState {
+ pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> {
+ let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?;
+ let channel = Endpoint::new(config.config_endpoint.to_string())?
+ .connect()
+ .await
+ .inspect_err(|e| {
+ error!(
+ endpoint = ?config.config_endpoint,
+ "could not connect to config service: {e}",
+ )
+ })?;
+
+ let query_typology_client = QueryTypologiesClient::with_interceptor(channel, MyInterceptor);
+
+ Ok(Self {
+ services,
+ config,
+ local_cache: Arc::new(RwLock::new(Cache::builder().build())),
+ query_typology_client,
+ })
+ }
+}