From 73d7bab8844bb21c7a9143c30800c2d11d411e42 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Sun, 17 Aug 2025 20:02:49 +0200 Subject: feat: typology processor (#8) --- crates/typologies/src/processor/driver.rs | 40 ++++ crates/typologies/src/processor/publish.rs | 44 +++++ crates/typologies/src/processor/reload.rs | 71 ++++++++ crates/typologies/src/processor/typology.rs | 180 ++++++++++++++++++ .../src/processor/typology/aggregate_rules.rs | 202 +++++++++++++++++++++ .../src/processor/typology/evaluate_expression.rs | 171 +++++++++++++++++ 6 files changed, 708 insertions(+) create mode 100644 crates/typologies/src/processor/driver.rs create mode 100644 crates/typologies/src/processor/publish.rs create mode 100644 crates/typologies/src/processor/reload.rs create mode 100644 crates/typologies/src/processor/typology.rs create mode 100644 crates/typologies/src/processor/typology/aggregate_rules.rs create mode 100644 crates/typologies/src/processor/typology/evaluate_expression.rs (limited to 'crates/typologies/src/processor') diff --git a/crates/typologies/src/processor/driver.rs b/crates/typologies/src/processor/driver.rs new file mode 100644 index 0000000..d150620 --- /dev/null +++ b/crates/typologies/src/processor/driver.rs @@ -0,0 +1,40 @@ +use tonic::IntoRequest; +use warden_core::configuration::typology::{TypologyConfiguration, TypologyConfigurationRequest}; + +use crate::state::AppHandle; + +pub trait GetTypologyConfiguration { + fn get_typology_config( + &self, + typology_key: TypologyConfigurationRequest, + ) -> impl std::future::Future> + Send; +} + +impl GetTypologyConfiguration for AppHandle { + async fn get_typology_config( + &self, + typology_key: TypologyConfigurationRequest, + ) -> anyhow::Result { + { + let local_cache = self.local_cache.read().await; + if let Some(result) = local_cache.get(&typology_key).await.map(Ok) { + return result; + } + } + + let local_cache = self.local_cache.write().await; + let mut client = self.query_typology_client.clone(); + + let value = client + .get_typology_configuration(typology_key.clone().into_request()) + .await? + .into_inner() + .configuration + .ok_or_else(|| anyhow::anyhow!("configuration unavailable"))?; + local_cache + .insert(typology_key.clone(), value.clone()) + .await; + + Ok(value) + } +} diff --git a/crates/typologies/src/processor/publish.rs b/crates/typologies/src/processor/publish.rs new file mode 100644 index 0000000..b031bf3 --- /dev/null +++ b/crates/typologies/src/processor/publish.rs @@ -0,0 +1,44 @@ +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing::{Instrument, Span, debug, info_span, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::message::Payload; +use warden_stack::tracing::telemetry::nats::injector; + +use crate::state::AppHandle; + +pub(crate) async fn to_tadp( + subject: &str, + state: AppHandle, + payload: Payload, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!("{}.{}", state.config.nats.destination_prefix, subject); + debug!(subject = ?subject, "publishing"); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + span.set_attribute(attribute::MESSAGING_SYSTEM, "nats"); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + Ok(()) +} diff --git a/crates/typologies/src/processor/reload.rs b/crates/typologies/src/processor/reload.rs new file mode 100644 index 0000000..fac4c40 --- /dev/null +++ b/crates/typologies/src/processor/reload.rs @@ -0,0 +1,71 @@ +use async_nats::jetstream::consumer; +use futures_util::StreamExt; +use prost::Message as _; +use tracing::{error, info, trace}; +use uuid::Uuid; +use warden_core::configuration::{ConfigKind, ReloadEvent, typology::TypologyConfigurationRequest}; + +use crate::state::AppHandle; + +pub async fn reload(state: AppHandle) -> anyhow::Result<()> { + let id = Uuid::now_v7().to_string(); + info!(durable = id, "listening for configuration changes"); + + let durable = &id; + let consumer = state + .services + .jetstream + .get_stream(state.config.nats.config.stream.to_string()) + .await? + .get_or_create_consumer( + durable, + consumer::pull::Config { + durable_name: Some(durable.to_string()), + filter_subject: state.config.nats.config.reload_subject.to_string(), + deliver_policy: consumer::DeliverPolicy::LastPerSubject, + ..Default::default() + }, + ) + .await?; + + let mut messages = consumer.messages().await?; + while let Some(value) = messages.next().await { + match value { + Ok(message) => { + trace!("got reload cache event"); + if let Ok(res) = ReloadEvent::decode(message.payload.as_ref()) + && let Ok(kind) = ConfigKind::try_from(res.kind) + { + match kind { + ConfigKind::Typology => { + let local_cache = state.local_cache.write().await; + let id = res.id(); + let version = res.version(); + trace!( + id = id, + ver = version, + "update triggered, invalidating typology config" + ); + let key = TypologyConfigurationRequest { + id: id.to_string(), + version: version.to_string(), + }; + + local_cache.invalidate(&key).await; + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + _ => { + trace!(kind = ?kind, "detected reload event, nothing to do here, acknowledging"); + let _ = message.ack().await.inspect_err(|e| error!("{e}")); + } + } + } + } + Err(e) => { + error!("{e:?}") + } + } + } + + Ok(()) +} diff --git a/crates/typologies/src/processor/typology.rs b/crates/typologies/src/processor/typology.rs new file mode 100644 index 0000000..b1b2592 --- /dev/null +++ b/crates/typologies/src/processor/typology.rs @@ -0,0 +1,180 @@ +mod aggregate_rules; +mod evaluate_expression; + +use std::sync::Arc; + +use anyhow::Result; +use opentelemetry::global; +use prost::Message; +use tracing::{Instrument, Span, error, info, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{ + configuration::{routing::RoutingConfiguration, typology::TypologyConfigurationRequest}, + message::{Payload, RuleResult, TypologyResult}, +}; +use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor}; + +use crate::{ + processor::{driver::GetTypologyConfiguration as _, publish}, + state::AppHandle, +}; + +#[instrument(skip(message, state), err(Debug))] +pub async fn process_typology( + message: async_nats::jetstream::Message, + state: AppHandle, +) -> Result<()> { + let span = Span::current(); + + if let Some(ref headers) = message.headers { + let context = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::HeaderMap(headers)) + }); + span.set_parent(context); + }; + + let payload: Payload = Message::decode(message.payload.as_ref())?; + + if payload.transaction.is_none() { + warn!("transaction is empty - proceeding with ack"); + let _ = message.ack().await; + return Ok(()); + } + + let transaction = payload.transaction.as_ref().expect("to have returned"); + + match transaction { + warden_core::message::payload::Transaction::Pacs008(_) => { + warn!("Pacs008 is unsupported on this version: this should be unreachable"); + } + warden_core::message::payload::Transaction::Pacs002(pacs002_document) => { + let key = format!( + "tp_{}", + pacs002_document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id + ); + + let rule_result = &payload + .rule_result + .as_ref() + .expect("rule result should be here"); + let rule_results = cache_and_get_all(&key, rule_result, Arc::clone(&state)).await?; + + let routing = payload + .routing + .as_ref() + .expect("routing missing from payload"); + + let (mut typology_result, _rule_count) = + aggregate_rules::aggregate_rules(&rule_results, routing, rule_result)?; + + let _ = evaluate_typology(&mut typology_result, routing, payload.clone(), &key, state) + .await + .inspect_err(|e| error!("{e}")); + } + }; + + let span = info_span!("nats.ack"); + message + .ack() + .instrument(span) + .await + .map_err(|_| anyhow::anyhow!("ack error"))?; + + Ok(()) +} + +#[instrument(skip(typology_result, routing, payload, state), err(Debug))] +async fn evaluate_typology( + typology_result: &mut [TypologyResult], + routing: &RoutingConfiguration, + mut payload: Payload, + key: &str, + state: AppHandle, +) -> Result<()> { + for typology_result in typology_result.iter_mut() { + let handle = Arc::clone(&state); + let routing_rules = routing.messages[0].typologies.iter().find(|typology| { + typology.version.eq(&typology_result.version) && typology.id.eq(&typology_result.id) + }); + let typology_result_rules = &typology_result.rule_results; + + if routing_rules.is_some() + && typology_result_rules.len() < routing_rules.unwrap().rules.len() + { + continue; + } + + let typology_config = handle + .get_typology_config(TypologyConfigurationRequest { + id: typology_result.id.to_owned(), + version: typology_result.version.to_owned(), + }) + .await?; + + let result = evaluate_expression::evaluate_expression(typology_result, &typology_config)?; + + typology_result.result = result; + + let workflow = typology_config + .workflow + .as_ref() + .expect("no workflow in config"); + + if workflow.interdiction_threshold.is_some() { + typology_result.workflow.replace(*workflow); + } + typology_result.review = result.ge(&typology_config.workflow.unwrap().alert_threshold); + + payload.typology_result = Some(typology_result.to_owned()); + + let is_interdicting = typology_config + .workflow + .unwrap() + .interdiction_threshold + .is_some_and(|value| value > 0.0 && result >= value); + + if is_interdicting { + typology_result.review = true; + } + + if result >= typology_config.workflow.unwrap().alert_threshold { + info!("alerting"); + } + + let subj = handle.config.nats.destination_prefix.to_string(); + let _ = publish::to_tadp(&subj, handle, payload.clone()) + .await + .inspect_err(|e| error!("{e}")); + + let mut c = state.services.cache.get().await?; + c.del::<_, ()>(key).await?; + } + + Ok(()) +} + +async fn cache_and_get_all( + cache_key: &str, + rule_result: &RuleResult, + state: AppHandle, +) -> Result> { + let mut cache = state.services.cache.get().await?; + + let bytes = prost::Message::encode_to_vec(rule_result); + + let res = warden_stack::redis::pipe() + .sadd::<_, _>(cache_key, bytes) + .ignore() + .smembers(cache_key) + .query_async::>>>(&mut cache) + .await?; + + let members = res + .first() + .ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?; + + members + .iter() + .map(|value| RuleResult::decode(value.as_ref()).map_err(anyhow::Error::new)) + .collect() +} diff --git a/crates/typologies/src/processor/typology/aggregate_rules.rs b/crates/typologies/src/processor/typology/aggregate_rules.rs new file mode 100644 index 0000000..8f92dae --- /dev/null +++ b/crates/typologies/src/processor/typology/aggregate_rules.rs @@ -0,0 +1,202 @@ +use anyhow::Result; +use std::collections::HashSet; + +use warden_core::{ + configuration::routing::RoutingConfiguration, + message::{RuleResult, TypologyResult}, +}; + +pub(super) fn aggregate_rules( + rule_results: &[RuleResult], + routing: &RoutingConfiguration, + rule_result: &RuleResult, +) -> Result<(Vec, usize)> { + let mut typology_result: Vec = vec![]; + let mut all_rules_set = HashSet::new(); + + routing.messages.iter().for_each(|message| { + message.typologies.iter().for_each(|typology| { + let mut set = HashSet::new(); + + for rule in typology.rules.iter() { + set.insert((&rule.id, rule.version())); + all_rules_set.insert((&rule.id, rule.version())); + } + + if !set.contains(&(&rule_result.id, rule_result.version.as_str())) { + return; + } + + let rule_results: Vec<_> = rule_results + .iter() + .filter_map(|value| { + if set.contains(&(&value.id, &value.version)) { + Some(value.to_owned()) + } else { + None + } + }) + .collect(); + + if !rule_results.is_empty() { + typology_result.push(TypologyResult { + id: typology.id.to_owned(), + version: typology.version.to_owned(), + rule_results, + ..Default::default() + }); + } + }); + }); + + Ok((typology_result, all_rules_set.len())) +} + +#[cfg(test)] +mod tests { + use super::*; + use warden_core::{ + configuration::routing::{Message, RoutingConfiguration, Rule, Typology}, + message::RuleResult, + }; + + fn create_rule(id: &str, version: &str) -> Rule { + Rule { + id: id.to_string(), + version: Some(version.to_string()), + } + } + + fn create_rule_result(id: &str, version: &str) -> RuleResult { + RuleResult { + id: id.to_string(), + version: version.to_string(), + ..Default::default() + } + } + + #[test] + fn returns_empty_when_no_matching_typology() { + let routing = RoutingConfiguration { + messages: vec![Message { + typologies: vec![Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1")], + }], + ..Default::default() + }], + ..Default::default() + }; + + let rule_results = vec![create_rule_result("R2", "v1")]; + let input_rule = create_rule_result("R2", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + assert!(result.is_empty()); + assert_eq!(count, 1); // one rule in routing + } + + #[test] + fn returns_typology_with_matching_rule() { + let routing = RoutingConfiguration { + messages: vec![Message { + typologies: vec![Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")], + }], + ..Default::default() + }], + ..Default::default() + }; + + let rule_results = vec![ + create_rule_result("R1", "v1"), + create_rule_result("R2", "v1"), + ]; + + let input_rule = create_rule_result("R1", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + + assert_eq!(count, 2); // R1, R2 + assert_eq!(result.len(), 1); + assert_eq!(result[0].id, "T1"); + assert_eq!(result[0].rule_results.len(), 2); + } + + #[test] + fn ignores_unrelated_rules_in_rule_results() { + let routing = RoutingConfiguration { + messages: vec![Message { + typologies: vec![Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1")], + }], + ..Default::default() + }], + ..Default::default() + }; + + let rule_results = vec![ + create_rule_result("R1", "v1"), + create_rule_result("R99", "v1"), // unrelated + ]; + + let input_rule = create_rule_result("R1", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + + assert_eq!(count, 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0].rule_results.len(), 1); + assert_eq!(result[0].rule_results[0].id, "R1"); + } + + #[test] + fn handles_multiple_messages_and_typologies() { + let routing = RoutingConfiguration { + messages: vec![ + Message { + typologies: vec![ + Typology { + id: "T1".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1")], + }, + Typology { + id: "T2".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R2", "v1")], + }, + ], + ..Default::default() + }, + Message { + typologies: vec![Typology { + id: "T3".to_string(), + version: "v1".to_string(), + rules: vec![create_rule("R1", "v1"), create_rule("R2", "v1")], + }], + ..Default::default() + }, + ], + ..Default::default() + }; + + let rule_results = vec![ + create_rule_result("R1", "v1"), + create_rule_result("R2", "v1"), + ]; + let input_rule = create_rule_result("R1", "v1"); + + let (result, count) = aggregate_rules(&rule_results, &routing, &input_rule).unwrap(); + + assert_eq!(count, 2); // R1, R2 appear in multiple typologies, but unique rules are 2 + assert_eq!(result.len(), 2); // T1 (R1) and T3 (R1 & R2) + assert_eq!(result[0].id, "T1"); + assert_eq!(result[1].id, "T3"); + } +} diff --git a/crates/typologies/src/processor/typology/evaluate_expression.rs b/crates/typologies/src/processor/typology/evaluate_expression.rs new file mode 100644 index 0000000..844011e --- /dev/null +++ b/crates/typologies/src/processor/typology/evaluate_expression.rs @@ -0,0 +1,171 @@ +use anyhow::Result; +use tracing::warn; +use warden_core::{configuration::typology::TypologyConfiguration, message::TypologyResult}; + +pub(super) fn evaluate_expression( + typology_result: &mut TypologyResult, + typology_config: &TypologyConfiguration, +) -> Result { + let mut to_return = 0.0; + let expression = typology_config + .expression + .as_ref() + .expect("expression is missing"); + + let rule_values = &typology_config.rules; + + for rule in expression.terms.iter() { + let rule_result = typology_result + .rule_results + .iter() + .find(|value| value.id.eq(&rule.id) && value.version.eq(&rule.version)); + + if rule_result.is_none() { + warn!(term = ?rule, "could not find rule result for typology term"); + return Ok(Default::default()); + } + + let rule_result = rule_result.expect("checked and is some"); + + let weight = rule_values + .iter() + .filter_map(|rv| { + if !(rv.id.eq(&rule_result.id) && rv.version.eq(&rule_result.version)) { + None + } else { + rv.wghts.iter().find_map(|value| { + match value.r#ref.eq(&rule_result.sub_rule_ref) { + true => Some(value.wght), + false => None, + } + }) + } + }) + .next(); + + if weight.is_none() { + warn!(rule = ?rule, "could not find a weight for the matching rule"); + } + let weight = weight.unwrap_or_default(); + + to_return = match expression.operator() { + warden_core::configuration::typology::Operator::Add => to_return + weight, + warden_core::configuration::typology::Operator::Multiply => to_return * weight, + warden_core::configuration::typology::Operator::Subtract => to_return - weight, + warden_core::configuration::typology::Operator::Divide => { + if weight.ne(&0.0) { + to_return / weight + } else { + to_return + } + } + }; + } + Ok(to_return) +} + +#[cfg(test)] +mod tests { + use warden_core::{ + configuration::typology::{Expression, Operator, Term, TypologyRule, TypologyRuleWeight}, + message::RuleResult, + }; + + use super::*; + + fn make_rule_result(id: &str, version: &str, sub_ref: &str) -> RuleResult { + RuleResult { + id: id.to_string(), + version: version.to_string(), + sub_rule_ref: sub_ref.to_string(), + ..Default::default() + } + } + + fn make_rule_value(id: &str, version: &str, ref_name: &str, weight: f64) -> TypologyRule { + TypologyRule { + id: id.to_string(), + version: version.to_string(), + wghts: vec![TypologyRuleWeight { + r#ref: ref_name.to_string(), + wght: weight, + }], + } + } + + fn make_expression(terms: Vec<(&str, &str)>, op: Operator) -> Expression { + Expression { + terms: terms + .into_iter() + .map(|(id, version)| Term { + id: id.to_string(), + version: version.to_string(), + }) + .collect(), + operator: op.into(), + } + } + + #[test] + fn test_add_operator_multiple_terms() { + let mut typology_result = TypologyResult { + rule_results: vec![ + make_rule_result("R1", "v1", "sub1"), + make_rule_result("R2", "v1", "sub2"), + ], + ..Default::default() + }; + + let config = TypologyConfiguration { + expression: Some(make_expression( + vec![("R1", "v1"), ("R2", "v1")], + Operator::Add, + )), + rules: vec![ + make_rule_value("R1", "v1", "sub1", 10.0), + make_rule_value("R2", "v1", "sub2", 5.0), + ], + ..Default::default() + }; + + let result = evaluate_expression(&mut typology_result, &config).unwrap(); + assert_eq!(result, 15.0); + } + + #[test] + fn test_missing_rule_result_returns_zero() { + let mut typology_result = TypologyResult { + rule_results: vec![make_rule_result("R1", "v1", "sub1")], + ..Default::default() + }; + + let config = TypologyConfiguration { + expression: Some(make_expression( + vec![("R1", "v1"), ("R2", "v1")], + Operator::Add, + )), + rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)], + ..Default::default() + }; + + let result = evaluate_expression(&mut typology_result, &config).unwrap(); + assert_eq!(result, 0.0); + } + + #[test] + fn test_missing_weight_defaults_to_zero() { + let mut typology_result = TypologyResult { + rule_results: vec![make_rule_result("R1", "v1", "subX")], // sub_ref doesn't match + ..Default::default() + }; + + let config = TypologyConfiguration { + expression: Some(make_expression(vec![("R1", "v1")], Operator::Add)), + rules: vec![make_rule_value("R1", "v1", "sub1", 10.0)], // different ref + ..Default::default() + }; + + let result = evaluate_expression(&mut typology_result, &config).unwrap(); + assert_eq!(result, 0.0); + } +} -- cgit v1.2.3