diff options
Diffstat (limited to 'crates/rule-executor/src')
-rw-r--r-- | crates/rule-executor/src/main.rs | 14 | ||||
-rw-r--r-- | crates/rule-executor/src/processor/publish.rs | 44 | ||||
-rw-r--r-- | crates/rule-executor/src/processor/rule.rs | 23 | ||||
-rw-r--r-- | crates/rule-executor/src/processor/rule/configuration.rs | 2 | ||||
-rw-r--r-- | crates/rule-executor/src/processor/rule/determine_outcome.rs | 119 | ||||
-rw-r--r-- | crates/rule-executor/src/processor/rule/rule_901.rs | 126 | ||||
-rw-r--r-- | crates/rule-executor/src/state.rs | 3 |
7 files changed, 322 insertions, 9 deletions
diff --git a/crates/rule-executor/src/main.rs b/crates/rule-executor/src/main.rs index ed284c6..abae26d 100644 --- a/crates/rule-executor/src/main.rs +++ b/crates/rule-executor/src/main.rs @@ -1,4 +1,3 @@ -#[allow(dead_code)] mod cnfg; mod processor; @@ -47,6 +46,9 @@ async fn main() -> Result<()> { .nats_jetstream(&config.nats) .await .inspect_err(|e| error!("nats: {e}"))? + .postgres(&config.database) + .await + .inspect_err(|e| error!("postgres: {e}"))? .build(); let jetstream = services @@ -54,7 +56,15 @@ async fn main() -> Result<()> { .take() .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; - let services = state::Services { jetstream }; + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let services = state::Services { + jetstream, + postgres, + }; processor::serve(services, config, provider) .await diff --git a/crates/rule-executor/src/processor/publish.rs b/crates/rule-executor/src/processor/publish.rs index 8b13789..0d35977 100644 --- a/crates/rule-executor/src/processor/publish.rs +++ b/crates/rule-executor/src/processor/publish.rs @@ -1 +1,45 @@ +use warden_stack::tracing::telemetry::nats::injector; +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing::{Instrument, Span, debug, info_span, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::message::Payload; + +use crate::state::AppHandle; + +pub(super) async fn to_typologies( + subject: &str, + state: AppHandle, + payload: Payload, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!("{}.{}", state.config.nats.destination_prefix, subject); + debug!(subject = ?subject, "publishing"); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + span.set_attribute("otel.kind", "producer"); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + Ok(()) +} diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs index 3a54424..6eaf25c 100644 --- a/crates/rule-executor/src/processor/rule.rs +++ b/crates/rule-executor/src/processor/rule.rs @@ -2,15 +2,17 @@ use std::sync::Arc; use anyhow::Result; mod configuration; +mod determine_outcome; +mod rule_901; use async_nats::jetstream; use opentelemetry::global; -use tracing::{Span, error, instrument, warn}; +use tracing::{Span, debug, error, instrument, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; use warden_core::{configuration::rule::RuleConfigurationRequest, message::Payload}; use warden_stack::tracing::telemetry::nats; -use crate::state::AppHandle; +use crate::{processor::publish, state::AppHandle}; #[instrument( skip(message, state), @@ -27,7 +29,7 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu span.set_parent(context); }; - let payload: Payload = prost::Message::decode(message.payload.as_ref())?; + let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?; if payload.transaction.is_none() { warn!("transaction is empty - proceeding with ack"); @@ -58,10 +60,23 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu span.record("rule_id", &req.id); span.record("rule_version", &req.version); - let _rule_configuration = configuration::get_configuration(req, Arc::clone(&state)) + let config = configuration::get_configuration(req, Arc::clone(&state)) .await .unwrap(); + match rule_901::process_901(&config, &payload, state.clone()).await { + Ok(res) => { + debug!(outcome = ?res.reason, "rule executed"); + payload.rule_result = Some(res); + publish::to_typologies(&config.id, state, payload) + .await + .inspect_err(|e| error!("{e}"))?; + } + Err(e) => { + error!("{e}"); + } + }; + if let Err(e) = message.ack().await { error!("ack error {e:?}"); }; diff --git a/crates/rule-executor/src/processor/rule/configuration.rs b/crates/rule-executor/src/processor/rule/configuration.rs index 5f384aa..d8579e6 100644 --- a/crates/rule-executor/src/processor/rule/configuration.rs +++ b/crates/rule-executor/src/processor/rule/configuration.rs @@ -37,10 +37,8 @@ pub(super) async fn get_configuration( .configuration .ok_or_else(|| anyhow!("missing configuration"))?; - println!("inserting"); let cache = state.local_cache.write().await; cache.insert(request, config.clone()).await; - println!("inserted"); Ok(config) } diff --git a/crates/rule-executor/src/processor/rule/determine_outcome.rs b/crates/rule-executor/src/processor/rule/determine_outcome.rs new file mode 100644 index 0000000..727846d --- /dev/null +++ b/crates/rule-executor/src/processor/rule/determine_outcome.rs @@ -0,0 +1,119 @@ +use tracing::trace; +use warden_core::{configuration::rule::Band, message::RuleResult}; + +pub(super) fn determine_outcome(value: i64, bands: &[Band], rule_result: &mut RuleResult) { + trace!("calculating outcome"); + for band in bands { + let value_f64 = value as f64; + + if band.lower_limit.is_none_or(|lower| value_f64 >= lower) + && band.upper_limit.is_none_or(|upper| value_f64 < upper) + { + rule_result.sub_rule_ref = band.sub_rule_ref.to_owned(); + rule_result.reason = band.reason.to_owned(); + break; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_band(lower: Option<f64>, upper: Option<f64>, sub_ref: &str, reason: &str) -> Band { + Band { + lower_limit: lower, + upper_limit: upper, + sub_rule_ref: sub_ref.to_string(), + reason: reason.to_string(), + } + } + + #[test] + fn matches_band_within_limits() { + let bands = vec![ + make_band(Some(0.0), Some(10.0), "A", "Between 0 and 10"), + make_band(Some(10.0), Some(20.0), "B", "Between 10 and 20"), + ]; + let mut rule_result = RuleResult::default(); + + determine_outcome(5, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Between 0 and 10"); + } + + #[test] + fn matches_band_lower_inclusive_upper_exclusive() { + let bands = vec![ + make_band(Some(0.0), Some(10.0), "A", "Between 0 and 10"), + make_band(Some(10.0), Some(20.0), "B", "Between 10 and 20"), + ]; + let mut rule_result = RuleResult::default(); + + determine_outcome(10, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "B"); + assert_eq!(rule_result.reason, "Between 10 and 20"); + } + + #[test] + fn no_match_when_above_all_bands() { + let bands = vec![ + make_band(Some(0.0), Some(10.0), "A", "Between 0 and 10"), + make_band(Some(10.0), Some(20.0), "B", "Between 10 and 20"), + ]; + let mut rule_result = RuleResult::default(); + + determine_outcome(30, &bands, &mut rule_result); + + assert_eq!(rule_result, RuleResult::default()); + } + + #[test] + fn match_when_no_upper_limit() { + let bands = vec![make_band(Some(0.0), None, "A", "Above 0")]; + let mut rule_result = RuleResult::default(); + + determine_outcome(100, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Above 0"); + } + + #[test] + fn match_when_no_lower_limit() { + let bands = vec![make_band(None, Some(50.0), "A", "Below 50")]; + let mut rule_result = RuleResult::default(); + + determine_outcome(-10, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Below 50"); + } + + #[test] + fn match_when_no_limits() { + let bands = vec![make_band(None, None, "A", "Any value")]; + let mut rule_result = RuleResult::default(); + + determine_outcome(9999, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Any value"); + } + + #[test] + fn stops_after_first_match() { + let bands = vec![ + make_band(None, None, "A", "Any value"), + make_band(None, None, "B", "Second band"), + ]; + let mut rule_result = RuleResult::default(); + + determine_outcome(5, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Any value"); + } +} diff --git a/crates/rule-executor/src/processor/rule/rule_901.rs b/crates/rule-executor/src/processor/rule/rule_901.rs new file mode 100644 index 0000000..8dfe036 --- /dev/null +++ b/crates/rule-executor/src/processor/rule/rule_901.rs @@ -0,0 +1,126 @@ +use anyhow::{Result, anyhow}; +use determine_outcome::determine_outcome; +use opentelemetry_semantic_conventions::attribute; +use serde::Deserialize; +use sqlx::types::BigDecimal; +use time::OffsetDateTime; +use tracing::{Instrument, error, info_span, trace}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{ + configuration::rule::RuleConfiguration, + iso20022::TransactionType, + message::{Payload, RuleResult}, +}; + +use crate::{processor::rule::determine_outcome, state::AppHandle}; + +#[derive(Deserialize)] +pub struct Parameters { + max_query_range: f64, +} + +pub(super) async fn process_901( + configuration: &RuleConfiguration, + payload: &Payload, + state: AppHandle, +) -> Result<RuleResult> { + let mut rule_result = RuleResult { + id: configuration.id.to_string(), + version: configuration.version.to_string(), + ..Default::default() + }; + let c = configuration.configuration.as_ref(); + + let bands = c + .and_then(|value| { + if value.bands.is_empty() { + None + } else { + Some(&value.bands) + } + }) + .ok_or_else(|| anyhow!("no bands available"))?; + + let exit_conditions = c + .and_then(|value| { + if value.exit_conditions.is_empty() { + None + } else { + Some(&value.exit_conditions) + } + }) + .ok_or_else(|| anyhow!("no exit conditions available"))?; + + let parameters = c + .and_then(|value| value.parameters.as_ref()) + .ok_or_else(|| anyhow!("no parameters available"))?; + + let params: Parameters = serde_json::from_value(parameters.clone().into()) + .inspect_err(|e| error!("failed to deserailise params: {e:?}"))?; + + let unsuccessful_transaction = exit_conditions + .iter() + .find(|value| value.sub_rule_ref.eq(".x00")); + + if let Some(warden_core::message::payload::Transaction::Pacs002(pacs002_document)) = + payload.transaction.as_ref() + { + let tx_sts = pacs002_document + .f_i_to_f_i_pmt_sts_rpt + .tx_inf_and_sts + .first() + .ok_or_else(|| anyhow::anyhow!("tx sts to be there"))?; + + if tx_sts.tx_sts().ne("ACCC") { + let unsuccessful_transaction = unsuccessful_transaction + .ok_or_else(|| anyhow::anyhow!("no unsuccessful transaction ref"))?; + rule_result.reason = unsuccessful_transaction.reason.to_owned(); + rule_result.sub_rule_ref = unsuccessful_transaction.sub_rule_ref.to_owned(); + + return Ok(rule_result); + } + + let current_pacs002_timeframe: OffsetDateTime = pacs002_document + .f_i_to_f_i_pmt_sts_rpt + .grp_hdr + .cre_dt_tm + .try_into()?; + + let data_cache = payload + .data_cache + .as_ref() + .ok_or_else(|| anyhow::anyhow!("data cache is missing"))?; + + let range = BigDecimal::try_from(params.max_query_range)?; + + let tx_tp = TransactionType::PACS002.to_string(); + + let span = info_span!("rule.logic"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "901"); + span.set_attribute("otel.kind", "client"); + + trace!("executing rule query"); + let recent_transactions = sqlx::query_scalar!( + "select count(*) from transaction_relationship tr + where tr.destination = $1 + and tr.tx_tp = $2 + and extract(epoch from ($3::timestamptz - tr.cre_dt_tm)) * 1000 <= $4 + and tr.cre_dt_tm <= $3::timestamptz", + data_cache.dbtr_acct_id, + tx_tp, + current_pacs002_timeframe, + range, + ) + .fetch_one(&state.services.postgres) + .instrument(span) + .await? + .ok_or_else(|| anyhow::anyhow!("no data"))?; + + determine_outcome(recent_transactions, bands.as_ref(), &mut rule_result); + + Ok(rule_result) + } else { + Err(anyhow::anyhow!("no valid transaction")) + } +} diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs index efad4ea..432068e 100644 --- a/crates/rule-executor/src/state.rs +++ b/crates/rule-executor/src/state.rs @@ -9,7 +9,7 @@ use warden_core::configuration::rule::{ RuleConfiguration, RuleConfigurationRequest, query_rule_configuration_client::QueryRuleConfigurationClient, }; -use warden_stack::Configuration; +use warden_stack::{Configuration, sqlx::PgPool}; use crate::cnfg::LocalConfig; use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; @@ -17,6 +17,7 @@ use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; #[derive(Clone)] pub struct Services { pub jetstream: Context, + pub postgres: PgPool, } pub type AppHandle = Arc<AppState>; |