1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
use async_nats::jetstream::Message;
use opentelemetry::global;
use opentelemetry_semantic_conventions::attribute;
use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use uuid::Uuid;
use warden_core::{
configuration::routing::RoutingConfiguration,
message::{AggregationResult, Payload, TypologyResult, payload::Transaction},
};
use warden_stack::{redis::AsyncCommands, tracing::telemetry::nats::extractor};
use crate::state::AppHandle;
#[instrument(skip(message, state), err(Debug))]
pub async fn handle(message: Message, state: AppHandle) -> anyhow::Result<()> {
let span = Span::current();
if let Some(ref headers) = message.headers {
let context = global::get_text_map_propagator(|propagator| {
propagator.extract(&extractor::HeaderMap(headers))
});
span.set_parent(context);
};
let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?;
if let (Some(ref typology_result), Some(Transaction::Pacs002(document)), Some(routing)) = (
payload.typology_result.take(),
&payload.transaction,
&payload.routing,
) {
let cache_key = format!("tadp_{}_tp", document.f_i_to_f_i_pmt_sts_rpt.grp_hdr.msg_id);
let (typology_results, review) =
handle_typologies(typology_result, &state, &cache_key, routing).await?;
if typology_results
.len()
.ne(&routing.messages[0].typologies.len())
{
trace!("insufficient typology results for this typology. waiting for more");
return Ok(());
}
let aggs = AggregationResult {
id: routing.messages[0].id.to_owned(),
version: routing.messages[0].version.to_owned(),
typology_results,
review,
};
payload.aggregation_result = Some(aggs);
let _ = payload.rule_result.take();
let id = Uuid::now_v7();
debug!(%id, "inserting evaluation result");
let span = info_span!("create.evaluations.evaluation");
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
span.set_attribute(attribute::DB_COLLECTION_NAME, "transaction");
span.set_attribute("otel.kind", "client");
sqlx::query!(
"insert into evaluation (id, document) values ($1, $2)",
id,
sqlx::types::Json(&payload) as _
)
.execute(&state.services.postgres)
.instrument(span)
.await?;
info!(%id, "evaluation added");
let mut cache = state.services.cache.get().await?;
let span = Span::current();
span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
span.set_attribute(attribute::DB_OPERATION_NAME, "del");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string());
span.set_attribute("otel.kind", "client");
debug!("cache cleared");
cache.del::<_, ()>(&cache_key).await?;
} else {
error!("payload has insufficient data");
}
let span = info_span!("nats.ack");
message
.ack()
.instrument(span)
.await
.map_err(|_| anyhow::anyhow!("ack error"))?;
Ok(())
}
async fn handle_typologies(
payload: &TypologyResult,
state: &AppHandle,
cache_key: &str,
routing: &RoutingConfiguration,
) -> anyhow::Result<(Vec<TypologyResult>, bool)> {
let mut cache = state.services.cache.get().await?;
let bytes = prost::Message::encode_to_vec(payload);
let span = Span::current();
span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
span.set_attribute(attribute::DB_OPERATION_NAME, "sadd+scard");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string());
span.set_attribute("otel.kind", "client");
debug!("saving typology result");
let res = warden_stack::redis::pipe()
.sadd::<_, _>(cache_key, bytes)
.ignore()
.scard(cache_key)
.query_async::<Vec<usize>>(&mut cache)
.instrument(span)
.await?;
let typology_count = res
.first()
.ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?;
let typologies = &routing.messages[0].typologies;
if typology_count.lt(&typologies.len()) {
return Ok((vec![], false));
}
debug!("getting all typology results");
let span = Span::current();
span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
span.set_attribute(attribute::DB_OPERATION_NAME, "smembers");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, cache_key.to_string());
span.set_attribute("otel.kind", "client");
let res = cache
.smembers::<_, Vec<Vec<Vec<u8>>>>(cache_key)
.instrument(span)
.await?;
let members = res
.first()
.ok_or_else(|| anyhow::anyhow!("smembers did not return anything"))?;
let typologies: Result<Vec<TypologyResult>, _> = members
.iter()
.map(|value| {
<TypologyResult as prost::Message>::decode(value.as_ref()).map_err(anyhow::Error::new)
})
.collect();
let typologies = typologies?;
let mut review = false;
for typology in routing.messages[0].typologies.iter() {
if let Some(value) = typologies
.iter()
.find(|value| value.id.eq(&typology.id) && value.version.eq(&typology.version))
&& value.review
{
review = true;
}
}
Ok((typologies, review))
}
|