aboutsummaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/configuration/Cargo.toml3
-rw-r--r--crates/configuration/src/server.rs12
-rw-r--r--crates/configuration/src/server/http_svc.rs44
-rw-r--r--crates/configuration/src/server/http_svc/routes.rs14
-rw-r--r--crates/configuration/src/server/http_svc/routes/routing/get_active.rs52
-rw-r--r--crates/configuration/src/server/http_svc/routes/routing/post_routing.rs99
-rw-r--r--crates/configuration/src/server/http_svc/routes/rule.rs182
-rw-r--r--crates/configuration/src/server/http_svc/routes/rule/create.rs90
-rw-r--r--crates/configuration/src/server/http_svc/routes/rule/delete.rs102
-rw-r--r--crates/configuration/src/server/http_svc/routes/rule/get.rs113
-rw-r--r--crates/configuration/src/server/http_svc/routes/rule/update.rs133
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology.rs224
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology/create_typology.rs103
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs116
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology/get_typology.rs126
-rw-r--r--crates/configuration/src/server/http_svc/routes/typology/post_typology.rs175
-rw-r--r--crates/configuration/src/state/rule/mutate_rule.rs4
-rw-r--r--crates/configuration/src/state/typology/mutate_typology.rs4
-rw-r--r--crates/rule-executor/.env.example1
-rw-r--r--crates/rule-executor/Cargo.toml8
-rw-r--r--crates/rule-executor/src/main.rs14
-rw-r--r--crates/rule-executor/src/processor/publish.rs44
-rw-r--r--crates/rule-executor/src/processor/rule.rs23
-rw-r--r--crates/rule-executor/src/processor/rule/configuration.rs2
-rw-r--r--crates/rule-executor/src/processor/rule/determine_outcome.rs119
-rw-r--r--crates/rule-executor/src/processor/rule/rule_901.rs126
-rw-r--r--crates/rule-executor/src/state.rs3
-rw-r--r--crates/typologies/src/main.rs3
-rw-r--r--crates/typologies/src/processor/typology.rs2
-rw-r--r--crates/warden/src/server.rs230
-rw-r--r--crates/warden/src/server/routes/processor/pacs008.rs365
-rw-r--r--crates/warden/src/version.rs272
32 files changed, 2548 insertions, 260 deletions
diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml
index b290f08..a77a215 100644
--- a/crates/configuration/Cargo.toml
+++ b/crates/configuration/Cargo.toml
@@ -54,6 +54,9 @@ redoc = ["dep:utoipa-redoc", "utoipa-redoc/axum"]
rapidoc = ["dep:utoipa-rapidoc", "utoipa-rapidoc/axum"]
scalar = ["dep:utoipa-scalar", "utoipa-scalar/axum"]
+[dev-dependencies]
+tower = { workspace = true, features = ["util"] }
+
[dependencies.warden-stack]
workspace = true
features = ["api", "cache", "nats-jetstream", "postgres", "opentelemetry-tonic", "tracing-loki"]
diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs
index e31fc60..e31a57b 100644
--- a/crates/configuration/src/server.rs
+++ b/crates/configuration/src/server.rs
@@ -16,6 +16,10 @@ use warden_core::{
mutate_rule_configuration_server::MutateRuleConfigurationServer,
query_rule_configuration_server::QueryRuleConfigurationServer,
},
+ typology::{
+ mutate_typologies_server::MutateTypologiesServer,
+ query_typologies_server::QueryTypologiesServer,
+ },
},
};
use warden_middleware::grpc::interceptor::MyInterceptor;
@@ -44,6 +48,14 @@ pub fn serve(state: AppHandle) -> Result<(axum::Router, axum::Router), AppError>
state.clone(),
MyInterceptor,
))
+ .add_service(QueryTypologiesServer::with_interceptor(
+ state.clone(),
+ MyInterceptor,
+ ))
+ .add_service(MutateTypologiesServer::with_interceptor(
+ state.clone(),
+ MyInterceptor,
+ ))
.add_service(routing_reflector)
.into_axum_router()
.layer(
diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs
index 423d67d..2997e13 100644
--- a/crates/configuration/src/server/http_svc.rs
+++ b/crates/configuration/src/server/http_svc.rs
@@ -63,3 +63,47 @@ pub fn build_router(state: AppHandle) -> Router {
warden_middleware::apply(router)
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn health_check(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+ let app = build_router(state);
+
+ let response = app
+ .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::OK);
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs
index 92f3184..64fc4c3 100644
--- a/crates/configuration/src/server/http_svc/routes.rs
+++ b/crates/configuration/src/server/http_svc/routes.rs
@@ -31,3 +31,17 @@ pub fn router(store: AppHandle) -> OpenApiRouter {
))
.with_state(store)
}
+
+#[cfg(test)]
+pub(crate) fn test_config() -> warden_stack::Configuration {
+ use warden_stack::Configuration;
+
+ let config_path = "warden-config.toml";
+
+ let config = config::Config::builder()
+ .add_source(config::File::new(config_path, config::FileFormat::Toml))
+ .build()
+ .unwrap();
+
+ config.try_deserialize::<Configuration>().unwrap()
+}
diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
index 4875a80..6974c6d 100644
--- a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
+++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs
@@ -37,3 +37,55 @@ pub async fn active_routing(
.into_inner();
Ok(axum::Json(config.configuration).into_response())
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn get_empty(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+ let app = build_router(state);
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("GET")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/routing")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::OK);
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs b/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs
index 3578b65..ce9ba37 100644
--- a/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs
+++ b/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs
@@ -37,3 +37,102 @@ pub async fn post_routing(
Ok((axum::http::StatusCode::CREATED, axum::Json(response)))
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn create_routing(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+ let app = build_router(state);
+
+ let routing = serde_json::json!({
+ "active": true,
+ "name": "Public Network Map",
+ "version": "1.0.0",
+ "messages": [
+ {
+ "id": "004",
+ "version": "1.0.0",
+ "tx_tp": "pacs.002.001.12",
+ "typologies": [
+ {
+ "id": "999",
+ "version": "1.0.0",
+ "rules": [
+ {
+ "id": "901",
+ "version": "1.0.0"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ });
+
+ let body = serde_json::to_vec(&routing).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/routing")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::CREATED);
+
+ // should have an active one
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("GET")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/routing")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::OK);
+
+ // let bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
+ // let routing_info: RoutingConfiguration = serde_json::from_slice(&bytes).unwrap();
+ //
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/rule.rs b/crates/configuration/src/server/http_svc/routes/rule.rs
index f4b0d33..597693f 100644
--- a/crates/configuration/src/server/http_svc/routes/rule.rs
+++ b/crates/configuration/src/server/http_svc/routes/rule.rs
@@ -2,3 +2,185 @@ pub mod create;
pub mod delete;
pub mod get;
pub mod update;
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::{self, Body},
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_core::configuration::rule::RuleConfiguration;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn all_operations(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let rule = serde_json::json!({
+ "id": "901",
+ "version": "1.0.0",
+ "description": "Number of outgoing transactions - debtor",
+ "configuration": {
+ "parameters": {
+ "max_query_range": 86400000
+ },
+ "exit_conditions": [
+ {
+ "sub_rule_ref": ".x00",
+ "reason": "Incoming transaction is unsuccessful"
+ }
+ ],
+ "bands": [
+ {
+ "sub_rule_ref": ".01",
+ "upper_limit": 2,
+ "reason": "The debtor has performed one transaction to date"
+ },
+ {
+ "sub_rule_ref": ".02",
+ "lower_limit": 2,
+ "upper_limit": 3,
+ "reason": "The debtor has performed two transactions to date"
+ },
+ {
+ "sub_rule_ref": ".03",
+ "lower_limit": 3,
+ "reason": "The debtor has performed three or more transactions to date"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&rule).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::CREATED);
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("GET")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule?id=901&version=1.0.0")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let body = body::to_bytes(response.into_body(), usize::MAX)
+ .await
+ .unwrap();
+
+ let config: RuleConfiguration = serde_json::from_slice(&body).unwrap();
+
+ assert_eq!(&config.id, "901");
+ assert_eq!(&config.version, "1.0.0");
+
+ let rule = serde_json::json!({
+ "id": "902",
+ "version": "1.0.0",
+ "description": "Number of outgoing transactions - debtor",
+ "configuration": {
+ "parameters": {
+ "max_query_range": 86400000
+ },
+ "exit_conditions": [
+ {
+ "sub_rule_ref": ".x00",
+ "reason": "Incoming transaction is unsuccessful"
+ }
+ ],
+ "bands": []
+ }
+ });
+
+ let body = serde_json::to_vec(&rule).unwrap();
+
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method("PUT")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule?id=901&version=1.0.0")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("GET")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule?id=902&version=1.0.0")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let body = body::to_bytes(response.into_body(), usize::MAX)
+ .await
+ .unwrap();
+
+ let config: RuleConfiguration = serde_json::from_slice(&body).unwrap();
+
+ assert_eq!(&config.id, "902");
+ assert!(&config.configuration.unwrap().bands.is_empty());
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("DELETE")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule?id=902&version=1.0.0")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::OK);
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/rule/create.rs b/crates/configuration/src/server/http_svc/routes/rule/create.rs
index 809c00b..f7aba5b 100644
--- a/crates/configuration/src/server/http_svc/routes/rule/create.rs
+++ b/crates/configuration/src/server/http_svc/routes/rule/create.rs
@@ -36,3 +36,93 @@ pub async fn create_rule(
.into_inner();
Ok((axum::http::StatusCode::CREATED, axum::Json(response)))
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn post_rule(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let rule = serde_json::json!({
+ "id": "901",
+ "version": "1.0.0",
+ "description": "Number of outgoing transactions - debtor",
+ "configuration": {
+ "parameters": {
+ "max_query_range": 86400000
+ },
+ "exit_conditions": [
+ {
+ "sub_rule_ref": ".x00",
+ "reason": "Incoming transaction is unsuccessful"
+ }
+ ],
+ "bands": [
+ {
+ "sub_rule_ref": ".01",
+ "upper_limit": 2,
+ "reason": "The debtor has performed one transaction to date"
+ },
+ {
+ "sub_rule_ref": ".02",
+ "lower_limit": 2,
+ "upper_limit": 3,
+ "reason": "The debtor has performed two transactions to date"
+ },
+ {
+ "sub_rule_ref": ".03",
+ "lower_limit": 3,
+ "reason": "The debtor has performed three or more transactions to date"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&rule).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::CREATED);
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/rule/delete.rs b/crates/configuration/src/server/http_svc/routes/rule/delete.rs
index 2352fba..0182e47 100644
--- a/crates/configuration/src/server/http_svc/routes/rule/delete.rs
+++ b/crates/configuration/src/server/http_svc/routes/rule/delete.rs
@@ -40,3 +40,105 @@ pub async fn delete_rule_config(
Ok(axum::Json(body))
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn delete_rule(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let rule = serde_json::json!({
+ "id": "901",
+ "version": "1.0.0",
+ "description": "Number of outgoing transactions - debtor",
+ "configuration": {
+ "parameters": {
+ "max_query_range": 86400000
+ },
+ "exit_conditions": [
+ {
+ "sub_rule_ref": ".x00",
+ "reason": "Incoming transaction is unsuccessful"
+ }
+ ],
+ "bands": [
+ {
+ "sub_rule_ref": ".01",
+ "upper_limit": 2,
+ "reason": "The debtor has performed one transaction to date"
+ },
+ {
+ "sub_rule_ref": ".02",
+ "lower_limit": 2,
+ "upper_limit": 3,
+ "reason": "The debtor has performed two transactions to date"
+ },
+ {
+ "sub_rule_ref": ".03",
+ "lower_limit": 3,
+ "reason": "The debtor has performed three or more transactions to date"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&rule).unwrap();
+
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("DELETE")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule?id=901&version=1.0.0")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::OK);
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/rule/get.rs b/crates/configuration/src/server/http_svc/routes/rule/get.rs
index 935eefb..eccab62 100644
--- a/crates/configuration/src/server/http_svc/routes/rule/get.rs
+++ b/crates/configuration/src/server/http_svc/routes/rule/get.rs
@@ -40,3 +40,116 @@ pub async fn get_rule(
Ok(axum::Json(response))
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::{self, Body},
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_core::configuration::rule::RuleConfiguration;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn get(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let rule = serde_json::json!({
+ "id": "901",
+ "version": "1.0.0",
+ "description": "Number of outgoing transactions - debtor",
+ "configuration": {
+ "parameters": {
+ "max_query_range": 86400000
+ },
+ "exit_conditions": [
+ {
+ "sub_rule_ref": ".x00",
+ "reason": "Incoming transaction is unsuccessful"
+ }
+ ],
+ "bands": [
+ {
+ "sub_rule_ref": ".01",
+ "upper_limit": 2,
+ "reason": "The debtor has performed one transaction to date"
+ },
+ {
+ "sub_rule_ref": ".02",
+ "lower_limit": 2,
+ "upper_limit": 3,
+ "reason": "The debtor has performed two transactions to date"
+ },
+ {
+ "sub_rule_ref": ".03",
+ "lower_limit": 3,
+ "reason": "The debtor has performed three or more transactions to date"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&rule).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::CREATED);
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("GET")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule?id=901&version=1.0.0")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let body = body::to_bytes(response.into_body(), usize::MAX)
+ .await
+ .unwrap();
+
+ let config: RuleConfiguration = serde_json::from_slice(&body).unwrap();
+
+ assert_eq!(&config.id, "901");
+ assert_eq!(&config.version, "1.0.0");
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/rule/update.rs b/crates/configuration/src/server/http_svc/routes/rule/update.rs
index 7bf3fe0..2f61bcb 100644
--- a/crates/configuration/src/server/http_svc/routes/rule/update.rs
+++ b/crates/configuration/src/server/http_svc/routes/rule/update.rs
@@ -1,7 +1,8 @@
-use axum::extract::State;
+use axum::extract::{Query, State};
use tonic::IntoRequest;
use warden_core::configuration::rule::{
- RuleConfiguration, UpdateRuleRequest, mutate_rule_configuration_server::MutateRuleConfiguration,
+ RuleConfiguration, RuleConfigurationRequest, UpdateRuleRequest,
+ mutate_rule_configuration_server::MutateRuleConfiguration,
};
use crate::{
@@ -15,6 +16,7 @@ use crate::{
path = "/{version}/rule",
params(
("version" = Version, Path, description = "API version, e.g., v1, v2, v3"),
+ RuleConfigurationRequest
),
responses((
status = OK,
@@ -22,18 +24,21 @@ use crate::{
)),
operation_id = "update rule configuration", // https://github.com/juhaku/utoipa/issues/1170
tag = TAG_RULES,
- )
+)
]
#[axum::debug_handler]
#[tracing::instrument(skip(state))]
pub async fn update_rule_config(
version: Version,
+ Query(params): Query<RuleConfigurationRequest>,
State(state): State<AppHandle>,
axum::Json(body): axum::Json<RuleConfiguration>,
) -> Result<axum::Json<RuleConfiguration>, AppError> {
let config = state
.update_rule_configuration(
UpdateRuleRequest {
+ id: params.id,
+ version: params.version,
configuration: Some(body),
}
.into_request(),
@@ -43,3 +48,125 @@ pub async fn update_rule_config(
Ok(axum::Json(config))
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn update(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let rule = serde_json::json!({
+ "id": "901",
+ "version": "1.0.0",
+ "description": "Number of outgoing transactions - debtor",
+ "configuration": {
+ "parameters": {
+ "max_query_range": 86400000
+ },
+ "exit_conditions": [
+ {
+ "sub_rule_ref": ".x00",
+ "reason": "Incoming transaction is unsuccessful"
+ }
+ ],
+ "bands": [
+ {
+ "sub_rule_ref": ".01",
+ "upper_limit": 2,
+ "reason": "The debtor has performed one transaction to date"
+ },
+ {
+ "sub_rule_ref": ".02",
+ "lower_limit": 2,
+ "upper_limit": 3,
+ "reason": "The debtor has performed two transactions to date"
+ },
+ {
+ "sub_rule_ref": ".03",
+ "lower_limit": 3,
+ "reason": "The debtor has performed three or more transactions to date"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&rule).unwrap();
+
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let rule = serde_json::json!({
+ "id": "902",
+ "version": "1.0.0",
+ "description": "Number of outgoing transactions - debtor",
+ "configuration": {
+ "parameters": {
+ "max_query_range": 86400000
+ },
+ "exit_conditions": [
+ {
+ "sub_rule_ref": ".x00",
+ "reason": "Incoming transaction is unsuccessful"
+ }
+ ],
+ "bands": []
+ }
+ });
+
+ let body = serde_json::to_vec(&rule).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("PUT")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/rule?id=901&version=1.0.0")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::OK);
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/typology.rs b/crates/configuration/src/server/http_svc/routes/typology.rs
index 85a593b..c2fa270 100644
--- a/crates/configuration/src/server/http_svc/routes/typology.rs
+++ b/crates/configuration/src/server/http_svc/routes/typology.rs
@@ -2,3 +2,227 @@ pub mod create_typology;
pub mod delete_typology;
pub mod get_typology;
pub mod post_typology;
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::{self, Body},
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_core::configuration::typology::TypologyConfiguration;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn all_operations(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let typology = serde_json::json!({
+ "description": "Test description",
+ "typology_name": "Rule-901-Typology-999",
+ "id": "999",
+ "version": "1.0.0",
+ "workflow": {
+ "alert_threshold": 200,
+ "interdiction_threshold": 400
+ },
+ "rules": [
+ {
+ "id": "901",
+ "version": "1.0.0",
+ "wghts": [
+ {
+ "ref": ".err",
+ "wght": 0
+ },
+ {
+ "ref": ".x00",
+ "wght": 100
+ },
+ {
+ "ref": ".01",
+ "wght": 100
+ },
+ {
+ "ref": ".02",
+ "wght": 200
+ },
+ {
+ "ref": ".03",
+ "wght": 400
+ }
+ ]
+ }
+ ],
+ "expression": {
+ "operator": "ADD",
+ "terms": [
+ {
+ "id": "901",
+ "version": "1.0.0"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&typology).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::CREATED);
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("GET")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology?id=999&version=1.0.0")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let body = body::to_bytes(response.into_body(), usize::MAX)
+ .await
+ .unwrap();
+
+ let config: TypologyConfiguration = serde_json::from_slice(&body).unwrap();
+
+ assert_eq!(&config.id, "999");
+ assert_eq!(&config.version, "1.0.0");
+
+ let typology = serde_json::json!({
+ "description": "Test description",
+ "typology_name": "Rule-901-Typology-999",
+ "id": "901",
+ "version": "1.0.0",
+ "workflow": {
+ "alert_threshold": 200,
+ "interdiction_threshold": 400
+ },
+ "rules": [
+ {
+ "id": "901",
+ "version": "1.0.0",
+ "wghts": [
+ {
+ "ref": ".err",
+ "wght": 0
+ },
+ {
+ "ref": ".x00",
+ "wght": 100
+ },
+ {
+ "ref": ".01",
+ "wght": 100
+ },
+ {
+ "ref": ".02",
+ "wght": 200
+ },
+ {
+ "ref": ".03",
+ "wght": 400
+ }
+ ]
+ }
+ ],
+ "expression": {
+ "operator": "ADD",
+ "terms": [
+ {
+ "id": "901",
+ "version": "1.0.0"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&typology).unwrap();
+
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method("PUT")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology?id=999&version=1.0.0")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("GET")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology?id=901&version=1.0.0")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let body = body::to_bytes(response.into_body(), usize::MAX)
+ .await
+ .unwrap();
+
+ let config: TypologyConfiguration = serde_json::from_slice(&body).unwrap();
+
+ assert_eq!(&config.id, "901");
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("DELETE")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology?id=901&version=1.0.0")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::OK);
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs
index 9f4985a..8aeeeec 100644
--- a/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs
+++ b/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs
@@ -36,3 +36,106 @@ pub async fn create_typology(
.into_inner();
Ok((axum::http::StatusCode::CREATED, axum::Json(response)))
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn post_typology(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let typology = serde_json::json!({
+ "description": "Test description",
+ "typology_name": "Rule-901-Typology-999",
+ "id": "999",
+ "version": "1.0.0",
+ "workflow": {
+ "alert_threshold": 200,
+ "interdiction_threshold": 400
+ },
+ "rules": [
+ {
+ "id": "901",
+ "version": "1.0.0",
+ "wghts": [
+ {
+ "ref": ".err",
+ "wght": 0
+ },
+ {
+ "ref": ".x00",
+ "wght": 100
+ },
+ {
+ "ref": ".01",
+ "wght": 100
+ },
+ {
+ "ref": ".02",
+ "wght": 200
+ },
+ {
+ "ref": ".03",
+ "wght": 400
+ }
+ ]
+ }
+ ],
+ "expression": {
+ "operator": "ADD",
+ "terms": [
+ {
+ "id": "901",
+ "version": "1.0.0"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&typology).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::CREATED);
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs
index 0e85e29..276aa67 100644
--- a/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs
+++ b/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs
@@ -39,3 +39,119 @@ pub async fn delete_typology(
Ok(axum::Json(config))
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn delete_typology(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let typology = serde_json::json!({
+ "description": "Test description",
+ "typology_name": "Rule-901-Typology-999",
+ "id": "999",
+ "version": "1.0.0",
+ "workflow": {
+ "alert_threshold": 200,
+ "interdiction_threshold": 400
+ },
+ "rules": [
+ {
+ "id": "901",
+ "version": "1.0.0",
+ "wghts": [
+ {
+ "ref": ".err",
+ "wght": 0
+ },
+ {
+ "ref": ".x00",
+ "wght": 100
+ },
+ {
+ "ref": ".01",
+ "wght": 100
+ },
+ {
+ "ref": ".02",
+ "wght": 200
+ },
+ {
+ "ref": ".03",
+ "wght": 400
+ }
+ ]
+ }
+ ],
+ "expression": {
+ "operator": "ADD",
+ "terms": [
+ {
+ "id": "901",
+ "version": "1.0.0"
+ }
+ ]
+ }
+
+ });
+
+ let body = serde_json::to_vec(&typology).unwrap();
+
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("DELETE")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology?id=999&version=1.0.0")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::OK);
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs
index 4962593..40d3faf 100644
--- a/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs
+++ b/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs
@@ -38,3 +38,129 @@ pub async fn get_typology(
Ok(axum::Json(config.configuration))
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::{self, Body},
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_core::configuration::typology::TypologyConfiguration;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn get(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let typology = serde_json::json!({
+ "description": "Test description",
+ "typology_name": "Rule-901-Typology-999",
+ "id": "999",
+ "version": "1.0.0",
+ "workflow": {
+ "alert_threshold": 200,
+ "interdiction_threshold": 400
+ },
+ "rules": [
+ {
+ "id": "901",
+ "version": "1.0.0",
+ "wghts": [
+ {
+ "ref": ".err",
+ "wght": 0
+ },
+ {
+ "ref": ".x00",
+ "wght": 100
+ },
+ {
+ "ref": ".01",
+ "wght": 100
+ },
+ {
+ "ref": ".02",
+ "wght": 200
+ },
+ {
+ "ref": ".03",
+ "wght": 400
+ }
+ ]
+ }
+ ],
+ "expression": {
+ "operator": "ADD",
+ "terms": [
+ {
+ "id": "901",
+ "version": "1.0.0"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&typology).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::CREATED);
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("GET")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology?id=999&version=1.0.0")
+ .body(Body::empty())
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let body = body::to_bytes(response.into_body(), usize::MAX)
+ .await
+ .unwrap();
+
+ let config: TypologyConfiguration = serde_json::from_slice(&body).unwrap();
+
+ assert_eq!(&config.id, "999");
+ assert_eq!(&config.version, "1.0.0");
+ }
+}
diff --git a/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs
index 2864372..0c432ab 100644
--- a/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs
+++ b/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs
@@ -1,6 +1,7 @@
-use axum::extract::State;
+use axum::extract::{Query, State};
use warden_core::configuration::typology::{
- TypologyConfiguration, UpdateTypologyConfigRequest, mutate_typologies_server::MutateTypologies,
+ TypologyConfiguration, TypologyConfigurationRequest, UpdateTypologyConfigRequest,
+ mutate_typologies_server::MutateTypologies,
};
use crate::{
@@ -14,6 +15,7 @@ use crate::{
path = "/{version}/typology",
params(
("version" = Version, Path, description = "API version, e.g., v1, v2, v3"),
+ TypologyConfigurationRequest,
),
responses((
status = OK,
@@ -26,14 +28,183 @@ use crate::{
#[axum::debug_handler]
#[tracing::instrument(skip(state))]
pub async fn update(
+ version: Version,
+ Query(params): Query<TypologyConfigurationRequest>,
State(state): State<AppHandle>,
axum::Json(body): axum::Json<TypologyConfiguration>,
) -> Result<axum::Json<TypologyConfiguration>, AppError> {
let response = state
.update_typology_configuration(tonic::Request::new(UpdateTypologyConfigRequest {
+ id: params.id,
+ version: params.version,
configuration: Some(body),
}))
.await?
.into_inner();
Ok(axum::Json(response))
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::http_svc::{build_router, routes::test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn update(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+
+ let app = build_router(state);
+
+ let typology = serde_json::json!({
+ "description": "Test description",
+ "typology_name": "Rule-901-Typology-999",
+ "id": "999",
+ "version": "1.0.0",
+ "workflow": {
+ "alert_threshold": 200,
+ "interdiction_threshold": 400
+ },
+ "rules": [
+ {
+ "id": "901",
+ "version": "1.0.0",
+ "wghts": [
+ {
+ "ref": ".err",
+ "wght": 0
+ },
+ {
+ "ref": ".x00",
+ "wght": 100
+ },
+ {
+ "ref": ".01",
+ "wght": 100
+ },
+ {
+ "ref": ".02",
+ "wght": 200
+ },
+ {
+ "ref": ".03",
+ "wght": 400
+ }
+ ]
+ }
+ ],
+ "expression": {
+ "operator": "ADD",
+ "terms": [
+ {
+ "id": "901",
+ "version": "1.0.0"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&typology).unwrap();
+
+ app.clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let typology = serde_json::json!({
+ "description": "Test description",
+ "typology_name": "Rule-901-Typology-999",
+ "id": "901",
+ "version": "1.0.0",
+ "workflow": {
+ "alert_threshold": 200,
+ "interdiction_threshold": 400
+ },
+ "rules": [
+ {
+ "id": "901",
+ "version": "1.0.0",
+ "wghts": [
+ {
+ "ref": ".err",
+ "wght": 0
+ },
+ {
+ "ref": ".x00",
+ "wght": 100
+ },
+ {
+ "ref": ".01",
+ "wght": 100
+ },
+ {
+ "ref": ".02",
+ "wght": 200
+ },
+ {
+ "ref": ".03",
+ "wght": 400
+ }
+ ]
+ }
+ ],
+ "expression": {
+ "operator": "ADD",
+ "terms": [
+ {
+ "id": "901",
+ "version": "1.0.0"
+ }
+ ]
+ }
+ });
+
+ let body = serde_json::to_vec(&typology).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("PUT")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/typology?id=999&version=1.0.0")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::OK);
+ }
+}
diff --git a/crates/configuration/src/state/rule/mutate_rule.rs b/crates/configuration/src/state/rule/mutate_rule.rs
index 9c4f393..10898b6 100644
--- a/crates/configuration/src/state/rule/mutate_rule.rs
+++ b/crates/configuration/src/state/rule/mutate_rule.rs
@@ -73,8 +73,8 @@ impl MutateRuleConfiguration for AppHandle {
where id = $2 and version = $3
"#,
sqlx::types::Json(&config) as _,
- config.id,
- config.id,
+ request.id,
+ request.version,
)
.execute(&self.services.postgres)
.instrument(span)
diff --git a/crates/configuration/src/state/typology/mutate_typology.rs b/crates/configuration/src/state/typology/mutate_typology.rs
index f2ab2cc..2673297 100644
--- a/crates/configuration/src/state/typology/mutate_typology.rs
+++ b/crates/configuration/src/state/typology/mutate_typology.rs
@@ -73,8 +73,8 @@ impl MutateTypologies for AppHandle {
where id = $2 and version = $3
"#,
sqlx::types::Json(&config) as _,
- config.id,
- config.version,
+ request.id,
+ request.version,
)
.execute(&self.services.postgres)
.instrument(span)
diff --git a/crates/rule-executor/.env.example b/crates/rule-executor/.env.example
new file mode 100644
index 0000000..ae98d94
--- /dev/null
+++ b/crates/rule-executor/.env.example
@@ -0,0 +1 @@
+DATABASE_URL="postgres://postgres:password@localhost:5432/database"
diff --git a/crates/rule-executor/Cargo.toml b/crates/rule-executor/Cargo.toml
index 3bb9561..614faf0 100644
--- a/crates/rule-executor/Cargo.toml
+++ b/crates/rule-executor/Cargo.toml
@@ -19,6 +19,14 @@ opentelemetry-semantic-conventions.workspace = true
prost.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
+sqlx = { workspace = true, features = [
+ "bigdecimal",
+ "macros",
+ "postgres",
+ "runtime-tokio",
+ "time",
+ "tls-rustls",
+] }
time = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = [
"macros",
diff --git a/crates/rule-executor/src/main.rs b/crates/rule-executor/src/main.rs
index ed284c6..abae26d 100644
--- a/crates/rule-executor/src/main.rs
+++ b/crates/rule-executor/src/main.rs
@@ -1,4 +1,3 @@
-#[allow(dead_code)]
mod cnfg;
mod processor;
@@ -47,6 +46,9 @@ async fn main() -> Result<()> {
.nats_jetstream(&config.nats)
.await
.inspect_err(|e| error!("nats: {e}"))?
+ .postgres(&config.database)
+ .await
+ .inspect_err(|e| error!("postgres: {e}"))?
.build();
let jetstream = services
@@ -54,7 +56,15 @@ async fn main() -> Result<()> {
.take()
.ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
- let services = state::Services { jetstream };
+ let postgres = services
+ .postgres
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("database is not ready"))?;
+
+ let services = state::Services {
+ jetstream,
+ postgres,
+ };
processor::serve(services, config, provider)
.await
diff --git a/crates/rule-executor/src/processor/publish.rs b/crates/rule-executor/src/processor/publish.rs
index 8b13789..0d35977 100644
--- a/crates/rule-executor/src/processor/publish.rs
+++ b/crates/rule-executor/src/processor/publish.rs
@@ -1 +1,45 @@
+use warden_stack::tracing::telemetry::nats::injector;
+use opentelemetry::global;
+use opentelemetry_semantic_conventions::attribute;
+use tracing::{Instrument, Span, debug, info_span, warn};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::message::Payload;
+
+use crate::state::AppHandle;
+
+pub(super) async fn to_typologies(
+ subject: &str,
+ state: AppHandle,
+ payload: Payload,
+) -> anyhow::Result<()> {
+ // send transaction to next with nats
+ let subject = format!("{}.{}", state.config.nats.destination_prefix, subject);
+ debug!(subject = ?subject, "publishing");
+
+ let payload = prost::Message::encode_to_vec(&payload);
+
+ let mut headers = async_nats::HeaderMap::new();
+
+ let cx = Span::current().context();
+
+ global::get_text_map_propagator(|propagator| {
+ propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers))
+ });
+
+ let span = info_span!("nats.publish");
+ span.set_attribute(
+ attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
+ subject.to_string(),
+ );
+ span.set_attribute("otel.kind", "producer");
+ state
+ .services
+ .jetstream
+ .publish_with_headers(subject.clone(), headers, payload.into())
+ .instrument(span)
+ .await
+ .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?;
+
+ Ok(())
+}
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs
index 3a54424..6eaf25c 100644
--- a/crates/rule-executor/src/processor/rule.rs
+++ b/crates/rule-executor/src/processor/rule.rs
@@ -2,15 +2,17 @@ use std::sync::Arc;
use anyhow::Result;
mod configuration;
+mod determine_outcome;
+mod rule_901;
use async_nats::jetstream;
use opentelemetry::global;
-use tracing::{Span, error, instrument, warn};
+use tracing::{Span, debug, error, instrument, warn};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use warden_core::{configuration::rule::RuleConfigurationRequest, message::Payload};
use warden_stack::tracing::telemetry::nats;
-use crate::state::AppHandle;
+use crate::{processor::publish, state::AppHandle};
#[instrument(
skip(message, state),
@@ -27,7 +29,7 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu
span.set_parent(context);
};
- let payload: Payload = prost::Message::decode(message.payload.as_ref())?;
+ let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?;
if payload.transaction.is_none() {
warn!("transaction is empty - proceeding with ack");
@@ -58,10 +60,23 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu
span.record("rule_id", &req.id);
span.record("rule_version", &req.version);
- let _rule_configuration = configuration::get_configuration(req, Arc::clone(&state))
+ let config = configuration::get_configuration(req, Arc::clone(&state))
.await
.unwrap();
+ match rule_901::process_901(&config, &payload, state.clone()).await {
+ Ok(res) => {
+ debug!(outcome = ?res.reason, "rule executed");
+ payload.rule_result = Some(res);
+ publish::to_typologies(&config.id, state, payload)
+ .await
+ .inspect_err(|e| error!("{e}"))?;
+ }
+ Err(e) => {
+ error!("{e}");
+ }
+ };
+
if let Err(e) = message.ack().await {
error!("ack error {e:?}");
};
diff --git a/crates/rule-executor/src/processor/rule/configuration.rs b/crates/rule-executor/src/processor/rule/configuration.rs
index 5f384aa..d8579e6 100644
--- a/crates/rule-executor/src/processor/rule/configuration.rs
+++ b/crates/rule-executor/src/processor/rule/configuration.rs
@@ -37,10 +37,8 @@ pub(super) async fn get_configuration(
.configuration
.ok_or_else(|| anyhow!("missing configuration"))?;
- println!("inserting");
let cache = state.local_cache.write().await;
cache.insert(request, config.clone()).await;
- println!("inserted");
Ok(config)
}
diff --git a/crates/rule-executor/src/processor/rule/determine_outcome.rs b/crates/rule-executor/src/processor/rule/determine_outcome.rs
new file mode 100644
index 0000000..727846d
--- /dev/null
+++ b/crates/rule-executor/src/processor/rule/determine_outcome.rs
@@ -0,0 +1,119 @@
+use tracing::trace;
+use warden_core::{configuration::rule::Band, message::RuleResult};
+
+pub(super) fn determine_outcome(value: i64, bands: &[Band], rule_result: &mut RuleResult) {
+ trace!("calculating outcome");
+ for band in bands {
+ let value_f64 = value as f64;
+
+ if band.lower_limit.is_none_or(|lower| value_f64 >= lower)
+ && band.upper_limit.is_none_or(|upper| value_f64 < upper)
+ {
+ rule_result.sub_rule_ref = band.sub_rule_ref.to_owned();
+ rule_result.reason = band.reason.to_owned();
+ break;
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn make_band(lower: Option<f64>, upper: Option<f64>, sub_ref: &str, reason: &str) -> Band {
+ Band {
+ lower_limit: lower,
+ upper_limit: upper,
+ sub_rule_ref: sub_ref.to_string(),
+ reason: reason.to_string(),
+ }
+ }
+
+ #[test]
+ fn matches_band_within_limits() {
+ let bands = vec![
+ make_band(Some(0.0), Some(10.0), "A", "Between 0 and 10"),
+ make_band(Some(10.0), Some(20.0), "B", "Between 10 and 20"),
+ ];
+ let mut rule_result = RuleResult::default();
+
+ determine_outcome(5, &bands, &mut rule_result);
+
+ assert_eq!(rule_result.sub_rule_ref, "A");
+ assert_eq!(rule_result.reason, "Between 0 and 10");
+ }
+
+ #[test]
+ fn matches_band_lower_inclusive_upper_exclusive() {
+ let bands = vec![
+ make_band(Some(0.0), Some(10.0), "A", "Between 0 and 10"),
+ make_band(Some(10.0), Some(20.0), "B", "Between 10 and 20"),
+ ];
+ let mut rule_result = RuleResult::default();
+
+ determine_outcome(10, &bands, &mut rule_result);
+
+ assert_eq!(rule_result.sub_rule_ref, "B");
+ assert_eq!(rule_result.reason, "Between 10 and 20");
+ }
+
+ #[test]
+ fn no_match_when_above_all_bands() {
+ let bands = vec![
+ make_band(Some(0.0), Some(10.0), "A", "Between 0 and 10"),
+ make_band(Some(10.0), Some(20.0), "B", "Between 10 and 20"),
+ ];
+ let mut rule_result = RuleResult::default();
+
+ determine_outcome(30, &bands, &mut rule_result);
+
+ assert_eq!(rule_result, RuleResult::default());
+ }
+
+ #[test]
+ fn match_when_no_upper_limit() {
+ let bands = vec![make_band(Some(0.0), None, "A", "Above 0")];
+ let mut rule_result = RuleResult::default();
+
+ determine_outcome(100, &bands, &mut rule_result);
+
+ assert_eq!(rule_result.sub_rule_ref, "A");
+ assert_eq!(rule_result.reason, "Above 0");
+ }
+
+ #[test]
+ fn match_when_no_lower_limit() {
+ let bands = vec![make_band(None, Some(50.0), "A", "Below 50")];
+ let mut rule_result = RuleResult::default();
+
+ determine_outcome(-10, &bands, &mut rule_result);
+
+ assert_eq!(rule_result.sub_rule_ref, "A");
+ assert_eq!(rule_result.reason, "Below 50");
+ }
+
+ #[test]
+ fn match_when_no_limits() {
+ let bands = vec![make_band(None, None, "A", "Any value")];
+ let mut rule_result = RuleResult::default();
+
+ determine_outcome(9999, &bands, &mut rule_result);
+
+ assert_eq!(rule_result.sub_rule_ref, "A");
+ assert_eq!(rule_result.reason, "Any value");
+ }
+
+ #[test]
+ fn stops_after_first_match() {
+ let bands = vec![
+ make_band(None, None, "A", "Any value"),
+ make_band(None, None, "B", "Second band"),
+ ];
+ let mut rule_result = RuleResult::default();
+
+ determine_outcome(5, &bands, &mut rule_result);
+
+ assert_eq!(rule_result.sub_rule_ref, "A");
+ assert_eq!(rule_result.reason, "Any value");
+ }
+}
diff --git a/crates/rule-executor/src/processor/rule/rule_901.rs b/crates/rule-executor/src/processor/rule/rule_901.rs
new file mode 100644
index 0000000..8dfe036
--- /dev/null
+++ b/crates/rule-executor/src/processor/rule/rule_901.rs
@@ -0,0 +1,126 @@
+use anyhow::{Result, anyhow};
+use determine_outcome::determine_outcome;
+use opentelemetry_semantic_conventions::attribute;
+use serde::Deserialize;
+use sqlx::types::BigDecimal;
+use time::OffsetDateTime;
+use tracing::{Instrument, error, info_span, trace};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use warden_core::{
+ configuration::rule::RuleConfiguration,
+ iso20022::TransactionType,
+ message::{Payload, RuleResult},
+};
+
+use crate::{processor::rule::determine_outcome, state::AppHandle};
+
+#[derive(Deserialize)]
+pub struct Parameters {
+ max_query_range: f64,
+}
+
+pub(super) async fn process_901(
+ configuration: &RuleConfiguration,
+ payload: &Payload,
+ state: AppHandle,
+) -> Result<RuleResult> {
+ let mut rule_result = RuleResult {
+ id: configuration.id.to_string(),
+ version: configuration.version.to_string(),
+ ..Default::default()
+ };
+ let c = configuration.configuration.as_ref();
+
+ let bands = c
+ .and_then(|value| {
+ if value.bands.is_empty() {
+ None
+ } else {
+ Some(&value.bands)
+ }
+ })
+ .ok_or_else(|| anyhow!("no bands available"))?;
+
+ let exit_conditions = c
+ .and_then(|value| {
+ if value.exit_conditions.is_empty() {
+ None
+ } else {
+ Some(&value.exit_conditions)
+ }
+ })
+ .ok_or_else(|| anyhow!("no exit conditions available"))?;
+
+ let parameters = c
+ .and_then(|value| value.parameters.as_ref())
+ .ok_or_else(|| anyhow!("no parameters available"))?;
+
+ let params: Parameters = serde_json::from_value(parameters.clone().into())
+ .inspect_err(|e| error!("failed to deserailise params: {e:?}"))?;
+
+ let unsuccessful_transaction = exit_conditions
+ .iter()
+ .find(|value| value.sub_rule_ref.eq(".x00"));
+
+ if let Some(warden_core::message::payload::Transaction::Pacs002(pacs002_document)) =
+ payload.transaction.as_ref()
+ {
+ let tx_sts = pacs002_document
+ .f_i_to_f_i_pmt_sts_rpt
+ .tx_inf_and_sts
+ .first()
+ .ok_or_else(|| anyhow::anyhow!("tx sts to be there"))?;
+
+ if tx_sts.tx_sts().ne("ACCC") {
+ let unsuccessful_transaction = unsuccessful_transaction
+ .ok_or_else(|| anyhow::anyhow!("no unsuccessful transaction ref"))?;
+ rule_result.reason = unsuccessful_transaction.reason.to_owned();
+ rule_result.sub_rule_ref = unsuccessful_transaction.sub_rule_ref.to_owned();
+
+ return Ok(rule_result);
+ }
+
+ let current_pacs002_timeframe: OffsetDateTime = pacs002_document
+ .f_i_to_f_i_pmt_sts_rpt
+ .grp_hdr
+ .cre_dt_tm
+ .try_into()?;
+
+ let data_cache = payload
+ .data_cache
+ .as_ref()
+ .ok_or_else(|| anyhow::anyhow!("data cache is missing"))?;
+
+ let range = BigDecimal::try_from(params.max_query_range)?;
+
+ let tx_tp = TransactionType::PACS002.to_string();
+
+ let span = info_span!("rule.logic");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "901");
+ span.set_attribute("otel.kind", "client");
+
+ trace!("executing rule query");
+ let recent_transactions = sqlx::query_scalar!(
+ "select count(*) from transaction_relationship tr
+ where tr.destination = $1
+ and tr.tx_tp = $2
+ and extract(epoch from ($3::timestamptz - tr.cre_dt_tm)) * 1000 <= $4
+ and tr.cre_dt_tm <= $3::timestamptz",
+ data_cache.dbtr_acct_id,
+ tx_tp,
+ current_pacs002_timeframe,
+ range,
+ )
+ .fetch_one(&state.services.postgres)
+ .instrument(span)
+ .await?
+ .ok_or_else(|| anyhow::anyhow!("no data"))?;
+
+ determine_outcome(recent_transactions, bands.as_ref(), &mut rule_result);
+
+ Ok(rule_result)
+ } else {
+ Err(anyhow::anyhow!("no valid transaction"))
+ }
+}
diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs
index efad4ea..432068e 100644
--- a/crates/rule-executor/src/state.rs
+++ b/crates/rule-executor/src/state.rs
@@ -9,7 +9,7 @@ use warden_core::configuration::rule::{
RuleConfiguration, RuleConfigurationRequest,
query_rule_configuration_client::QueryRuleConfigurationClient,
};
-use warden_stack::Configuration;
+use warden_stack::{Configuration, sqlx::PgPool};
use crate::cnfg::LocalConfig;
use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor};
@@ -17,6 +17,7 @@ use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor};
#[derive(Clone)]
pub struct Services {
pub jetstream: Context,
+ pub postgres: PgPool,
}
pub type AppHandle = Arc<AppState>;
diff --git a/crates/typologies/src/main.rs b/crates/typologies/src/main.rs
index ea7843a..e96f6bb 100644
--- a/crates/typologies/src/main.rs
+++ b/crates/typologies/src/main.rs
@@ -45,6 +45,9 @@ async fn main() -> Result<()> {
.nats_jetstream(&config.nats)
.await
.inspect_err(|e| error!("nats: {e}"))?
+ .cache(&config.cache)
+ .await
+ .inspect_err(|e| error!("cache: {e}"))?
.build();
let jetstream = services
diff --git a/crates/typologies/src/processor/typology.rs b/crates/typologies/src/processor/typology.rs
index b1b2592..62e7089 100644
--- a/crates/typologies/src/processor/typology.rs
+++ b/crates/typologies/src/processor/typology.rs
@@ -83,7 +83,7 @@ pub async fn process_typology(
Ok(())
}
-#[instrument(skip(typology_result, routing, payload, state), err(Debug))]
+#[instrument(skip(routing, payload, state), err(Debug))]
async fn evaluate_typology(
typology_result: &mut [TypologyResult],
routing: &RoutingConfiguration,
diff --git a/crates/warden/src/server.rs b/crates/warden/src/server.rs
index 760138d..3dd170b 100644
--- a/crates/warden/src/server.rs
+++ b/crates/warden/src/server.rs
@@ -68,3 +68,233 @@ pub(crate) fn test_config() -> warden_stack::Configuration {
config.try_deserialize::<Configuration>().unwrap()
}
+
+#[cfg(test)]
+pub(crate) fn generate_id() -> String {
+ let id = uuid::Uuid::new_v4().to_string();
+ id.replace("-", "")
+}
+
+#[cfg(test)]
+pub(crate) fn test_pacs008() -> warden_core::iso20022::pacs008::Pacs008Document {
+ let msg_id = generate_id();
+ let cre_dt_tm = time::OffsetDateTime::now_utc()
+ .format(&time::format_description::well_known::Rfc3339)
+ .unwrap();
+ let end_to_end_id = generate_id();
+
+ let debtor_fsp = "fsp001";
+ let creditor_fsp = "fsp002";
+
+ let ccy = "XTS";
+
+ let v = serde_json::json!({
+ "f_i_to_f_i_cstmr_cdt_trf": {
+ "grp_hdr": {
+ "msg_id": msg_id,
+ "cre_dt_tm": cre_dt_tm,
+ "nb_of_txs": "CLRG",
+ "sttlm_inf": {
+ "sttlm_mtd": 1
+ }
+ },
+ "splmtry_data": [],
+ "cdt_trf_tx_inf": [
+ {
+ "pmt_id": {
+ "instr_id": generate_id(),
+ "end_to_end_id": end_to_end_id
+ },
+ "intr_bk_sttlm_amt": {
+ "value": 294.3,
+ "ccy": ccy,
+ },
+ "instd_amt": {
+ "value": 294.3,
+ "ccy": ccy
+ },
+ "xchg_rate": 1,
+ "chrg_br": 1,
+ "chrgs_inf": [
+ {
+ "amt": {
+ "value": 0,
+ "ccy": ccy
+ },
+ "agt": {
+ "fin_instn_id": {
+ "clr_sys_mmb_id": {
+ "mmb_id": debtor_fsp,
+ }
+ }
+ }
+ }
+ ],
+ "initg_pty": {
+ "nm": "April Blake Grant",
+ "id": {
+ "org_id": {
+ "othr": []
+ },
+ "prvt_id": {
+ "dt_and_plc_of_birth": {
+ "birth_dt": "1968-02-01",
+ "city_of_birth": "Unknown",
+ "ctry_of_birth": "ZZ"
+ },
+ "othr": [
+ {
+ "id": "+27730975224",
+ "schme_nm": {
+ "prtry": "MSISDN",
+ "cd": "cd-value"
+ }
+ }
+ ]
+ }
+ },
+ "ctct_dtls": {
+ "mob_nb": "+27-730975224",
+ "othr": []
+ }
+ },
+ "dbtr": {
+ "nm": "April Blake Grant",
+ "id": {
+ "org_id": {
+ "othr": []
+ },
+ "prvt_id": {
+ "dt_and_plc_of_birth": {
+ "birth_dt": "2000-07-23",
+ "city_of_birth": "Unknown",
+ "ctry_of_birth": "ZZ"
+ },
+ "othr": [
+ {
+ "id": generate_id(),
+ "schme_nm": {
+ "prtry": "EID",
+ "cd": "cd-value"
+ }
+ }
+ ]
+ }
+ },
+ "ctct_dtls": {
+ "mob_nb": "+27-730975224",
+ "othr": []
+ }
+ },
+ "dbtr_acct": {
+ "id": {
+ "i_b_a_n": "value",
+ "othr": {
+ "id": generate_id(),
+ "schme_nm": {
+ "prtry": "MSISDN",
+ "cd": "value"
+ }
+ }
+ },
+ "nm": "April Grant"
+ },
+ "dbtr_agt": {
+ "fin_instn_id": {
+ "clr_sys_mmb_id": {
+ "mmb_id": debtor_fsp,
+ }
+ }
+ },
+ "cdtr_agt": {
+ "fin_instn_id": {
+ "clr_sys_mmb_id": {
+ "mmb_id": creditor_fsp,
+ }
+ }
+ },
+ "cdtr": {
+ "nm": "Felicia Easton Quill",
+ "id": {
+ "org_id": {
+ "othr": []
+ },
+ "prvt_id": {
+ "dt_and_plc_of_birth": {
+ "birth_dt": "1935-05-08",
+ "city_of_birth": "Unknown",
+ "ctry_of_birth": "ZZ"
+ },
+ "othr": [
+ {
+ "id": generate_id(),
+ "schme_nm": {
+ "prtry": "EID",
+ "cd": ""
+ }
+ }
+ ]
+ }
+ },
+ "ctct_dtls": {
+ "mob_nb": "+27-707650428",
+ "othr": []
+ }
+ },
+ "cdtr_acct": {
+ "id": {
+ "i_b_a_n": "",
+ "othr": {
+ "id": generate_id(),
+ "schme_nm": {
+ "prtry": "MSISDN",
+ "cd": "acc"
+ }
+ }
+ },
+ "nm": "Felicia Quill"
+ },
+ "purp": {
+ "cd": "MP2P",
+ "prtry": ""
+ },
+ "rgltry_rptg": [
+ {
+ "dtls": [
+ {
+ "tp": "BALANCE OF PAYMENTS",
+ "cd": "100",
+ "inf": []
+ }
+ ]
+ }
+ ],
+ "rmt_inf": {
+ "ustrd": [],
+ "strd": []
+ },
+ "splmtry_data": [
+ {
+ "envlp": {
+ "doc": {
+ "xprtn": "2021-11-30T10:38:56.000Z",
+ "initg_pty": {
+ "glctn": {
+ "lat": "-3.1609",
+ "long": "38.3588"
+ }
+ }
+ }
+ }
+ }
+ ],
+ "instr_for_cdtr_agt": [],
+ "instr_for_nxt_agt": [],
+ "rltd_rmt_inf": []
+ }
+ ]
+ }
+ });
+
+ serde_json::from_value(v).unwrap()
+}
diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs
index 8d94ca9..af3f06f 100644
--- a/crates/warden/src/server/routes/processor/pacs008.rs
+++ b/crates/warden/src/server/routes/processor/pacs008.rs
@@ -62,22 +62,22 @@ pub(super) async fn post_pacs008(
);
}
- // take the first
+ // take the first - guaranteed by utoipa
trace!("extracting first credit transfer transaction info");
- let cdt_trf_tx_inf = transaction.f_i_to_f_i_cstmr_cdt_trf.cdt_trf_tx_inf.first();
-
- let amount = cdt_trf_tx_inf.and_then(|value| value.instd_amt.as_ref().map(|value| value.value));
+ let cdt_trf_tx_inf = transaction
+ .f_i_to_f_i_cstmr_cdt_trf
+ .cdt_trf_tx_inf
+ .first()
+ .expect("required cdt_trf_tx_inf missing");
- let ccy =
- cdt_trf_tx_inf.and_then(|value| value.instd_amt.as_ref().map(|value| value.ccy.as_str()));
+ let amount = cdt_trf_tx_inf.instd_amt.as_ref().map(|value| value.value);
- let end_to_end_id = cdt_trf_tx_inf
+ let ccy = cdt_trf_tx_inf
+ .instd_amt
.as_ref()
- .map(|value| value.pmt_id.end_to_end_id.as_str())
- .ok_or_else(|| {
- error!("missing end_to_end_id");
- anyhow::anyhow!("missing end_to_end_id id")
- })?;
+ .map(|value| value.ccy.as_str());
+
+ let end_to_end_id = cdt_trf_tx_inf.pmt_id.end_to_end_id.as_str();
tracing::Span::current().record("end_to_end_id", end_to_end_id);
let end_to_end_id = String::from(end_to_end_id);
@@ -85,12 +85,10 @@ pub(super) async fn post_pacs008(
let msg_id = &transaction.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id;
tracing::Span::current().record("msg_id", msg_id);
- let pmt_inf_id = cdt_trf_tx_inf
- .and_then(|value| value.pmt_id.instr_id.as_ref())
- .ok_or_else(|| {
- error!("missing pmt_inf_id");
- anyhow::anyhow!("missing pmt_inf_id id")
- })?;
+ let pmt_inf_id = cdt_trf_tx_inf.pmt_id.instr_id.as_ref().ok_or_else(|| {
+ error!("missing pmt_inf_id");
+ anyhow::anyhow!("missing pmt_inf_id id")
+ })?;
debug!(%msg_id, %end_to_end_id, "extracted transaction identifiers");
@@ -324,11 +322,10 @@ mod tests {
use sqlx::PgPool;
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use tower::ServiceExt;
- use uuid::Uuid;
use warden_stack::cache::RedisManager;
use crate::{
- server::{self, test_config},
+ server::{self, generate_id, metrics_app, test_config},
state::{AppState, Services},
};
@@ -350,226 +347,31 @@ mod tests {
)
.await
.unwrap();
- let app = server::router(state);
-
- let ccy = "XTS";
+ let app = server::router(state).merge(metrics_app());
- let msg_id = generate_id();
- let cre_dt_tm = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
+ let pacs = server::test_pacs008();
- let debtor_fsp = "fsp001";
- let creditor_fsp = "fsp002";
+ let inf = &pacs.f_i_to_f_i_cstmr_cdt_trf.cdt_trf_tx_inf[0];
+ let ccy = &inf.intr_bk_sttlm_amt.as_ref().unwrap().ccy;
+ let end_to_end_id = &inf.pmt_id.end_to_end_id;
+ let debtor_fsp = &inf.chrgs_inf[0]
+ .agt
+ .fin_instn_id
+ .clr_sys_mmb_id
+ .as_ref()
+ .unwrap()
+ .mmb_id;
+ let creditor_fsp = &inf
+ .cdtr_agt
+ .as_ref()
+ .unwrap()
+ .fin_instn_id
+ .clr_sys_mmb_id
+ .as_ref()
+ .unwrap()
+ .mmb_id;
- let end_to_end_id = generate_id();
-
- let v = serde_json::json!({
- "f_i_to_f_i_cstmr_cdt_trf": {
- "grp_hdr": {
- "msg_id": msg_id,
- "cre_dt_tm": cre_dt_tm,
- "nb_of_txs": "CLRG",
- "sttlm_inf": {
- "sttlm_mtd": 1
- }
- },
- "splmtry_data": [],
- "cdt_trf_tx_inf": [
- {
- "pmt_id": {
- "instr_id": generate_id(),
- "end_to_end_id": end_to_end_id
- },
- "intr_bk_sttlm_amt": {
- "value": 294.3,
- "ccy": ccy,
- },
- "instd_amt": {
- "value": 294.3,
- "ccy": ccy
- },
- "xchg_rate": 1,
- "chrg_br": 1,
- "chrgs_inf": [
- {
- "amt": {
- "value": 0,
- "ccy": ccy
- },
- "agt": {
- "fin_instn_id": {
- "clr_sys_mmb_id": {
- "mmb_id": debtor_fsp,
- }
- }
- }
- }
- ],
- "initg_pty": {
- "nm": "April Blake Grant",
- "id": {
- "org_id": {
- "othr": []
- },
- "prvt_id": {
- "dt_and_plc_of_birth": {
- "birth_dt": "1968-02-01",
- "city_of_birth": "Unknown",
- "ctry_of_birth": "ZZ"
- },
- "othr": [
- {
- "id": "+27730975224",
- "schme_nm": {
- "prtry": "MSISDN",
- "cd": "cd-value"
- }
- }
- ]
- }
- },
- "ctct_dtls": {
- "mob_nb": "+27-730975224",
- "othr": []
- }
- },
- "dbtr": {
- "nm": "April Blake Grant",
- "id": {
- "org_id": {
- "othr": []
- },
- "prvt_id": {
- "dt_and_plc_of_birth": {
- "birth_dt": "2000-07-23",
- "city_of_birth": "Unknown",
- "ctry_of_birth": "ZZ"
- },
- "othr": [
- {
- "id": generate_id(),
- "schme_nm": {
- "prtry": "EID",
- "cd": "cd-value"
- }
- }
- ]
- }
- },
- "ctct_dtls": {
- "mob_nb": "+27-730975224",
- "othr": []
- }
- },
- "dbtr_acct": {
- "id": {
- "i_b_a_n": "value",
- "othr": {
- "id": generate_id(),
- "schme_nm": {
- "prtry": "MSISDN",
- "cd": "value"
- }
- }
- },
- "nm": "April Grant"
- },
- "dbtr_agt": {
- "fin_instn_id": {
- "clr_sys_mmb_id": {
- "mmb_id": debtor_fsp,
- }
- }
- },
- "cdtr_agt": {
- "fin_instn_id": {
- "clr_sys_mmb_id": {
- "mmb_id": creditor_fsp,
- }
- }
- },
- "cdtr": {
- "nm": "Felicia Easton Quill",
- "id": {
- "org_id": {
- "othr": []
- },
- "prvt_id": {
- "dt_and_plc_of_birth": {
- "birth_dt": "1935-05-08",
- "city_of_birth": "Unknown",
- "ctry_of_birth": "ZZ"
- },
- "othr": [
- {
- "id": generate_id(),
- "schme_nm": {
- "prtry": "EID",
- "cd": ""
- }
- }
- ]
- }
- },
- "ctct_dtls": {
- "mob_nb": "+27-707650428",
- "othr": []
- }
- },
- "cdtr_acct": {
- "id": {
- "i_b_a_n": "",
- "othr": {
- "id": generate_id(),
- "schme_nm": {
- "prtry": "MSISDN",
- "cd": "acc"
- }
- }
- },
- "nm": "Felicia Quill"
- },
- "purp": {
- "cd": "MP2P",
- "prtry": ""
- },
- "rgltry_rptg": [
- {
- "dtls": [
- {
- "tp": "BALANCE OF PAYMENTS",
- "cd": "100",
- "inf": []
- }
- ]
- }
- ],
- "rmt_inf": {
- "ustrd": [],
- "strd": []
- },
- "splmtry_data": [
- {
- "envlp": {
- "doc": {
- "xprtn": "2021-11-30T10:38:56.000Z",
- "initg_pty": {
- "glctn": {
- "lat": "-3.1609",
- "long": "38.3588"
- }
- }
- }
- }
- }
- ],
- "instr_for_cdtr_agt": [],
- "instr_for_nxt_agt": [],
- "rltd_rmt_inf": []
- }
- ]
- }
- });
- let body = serde_json::to_vec(&v).unwrap();
+ let body = serde_json::to_vec(&pacs).unwrap();
let response = app
.clone()
@@ -586,12 +388,93 @@ mod tests {
assert_eq!(response.status(), StatusCode::CREATED);
- post_clearance(app, &end_to_end_id, ccy, debtor_fsp, creditor_fsp).await;
+ post_clearance(app, end_to_end_id, ccy, debtor_fsp, creditor_fsp).await;
+ }
+
+ #[sqlx::test]
+ async fn post_missing_e2e(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+ let app = server::router(state);
+ // no end to end id
+
+ let mut pacs = server::test_pacs008();
+ pacs.f_i_to_f_i_cstmr_cdt_trf.cdt_trf_tx_inf = vec![];
+
+ let body = serde_json::to_vec(&pacs).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/pacs008")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
- fn generate_id() -> String {
- let id = Uuid::new_v4().to_string();
- id.replace("-", "")
+ #[sqlx::test]
+ async fn post_missing_pmt_id(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+ let app = server::router(state);
+ // no end to end id
+
+ let mut pacs = server::test_pacs008();
+ pacs.f_i_to_f_i_cstmr_cdt_trf.cdt_trf_tx_inf[0]
+ .pmt_id
+ .instr_id = None;
+
+ let body = serde_json::to_vec(&pacs).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v0/pacs008")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
async fn post_clearance(
diff --git a/crates/warden/src/version.rs b/crates/warden/src/version.rs
index 4eb5677..1af6891 100644
--- a/crates/warden/src/version.rs
+++ b/crates/warden/src/version.rs
@@ -33,3 +33,275 @@ where
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use axum::{
+ body::Body,
+ http::{Request, StatusCode},
+ };
+ use sqlx::PgPool;
+ use time::{OffsetDateTime, format_description::well_known::Rfc3339};
+ use tower::ServiceExt;
+ use warden_stack::cache::RedisManager;
+
+ use crate::{
+ server::{self, generate_id, test_config},
+ state::{AppState, Services},
+ };
+
+ #[sqlx::test]
+ async fn invalid_version(pool: PgPool) {
+ let config = test_config();
+
+ let cache = RedisManager::new(&config.cache).await.unwrap();
+ let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap();
+ let jetstream = async_nats::jetstream::new(client);
+
+ let state = AppState::create(
+ Services {
+ postgres: pool,
+ cache,
+ jetstream,
+ },
+ &test_config(),
+ )
+ .await
+ .unwrap();
+ let app = server::router(state);
+
+ let ccy = "XTS";
+
+ let msg_id = generate_id();
+ let cre_dt_tm = OffsetDateTime::now_utc().format(&Rfc3339).unwrap();
+
+ let debtor_fsp = "fsp001";
+ let creditor_fsp = "fsp002";
+
+ let end_to_end_id = generate_id();
+
+ let v = serde_json::json!({
+ "f_i_to_f_i_cstmr_cdt_trf": {
+ "grp_hdr": {
+ "msg_id": msg_id,
+ "cre_dt_tm": cre_dt_tm,
+ "nb_of_txs": "CLRG",
+ "sttlm_inf": {
+ "sttlm_mtd": 1
+ }
+ },
+ "splmtry_data": [],
+ "cdt_trf_tx_inf": [
+ {
+ "pmt_id": {
+ "instr_id": generate_id(),
+ "end_to_end_id": end_to_end_id
+ },
+ "intr_bk_sttlm_amt": {
+ "value": 294.3,
+ "ccy": ccy,
+ },
+ "instd_amt": {
+ "value": 294.3,
+ "ccy": ccy
+ },
+ "xchg_rate": 1,
+ "chrg_br": 1,
+ "chrgs_inf": [
+ {
+ "amt": {
+ "value": 0,
+ "ccy": ccy
+ },
+ "agt": {
+ "fin_instn_id": {
+ "clr_sys_mmb_id": {
+ "mmb_id": debtor_fsp,
+ }
+ }
+ }
+ }
+ ],
+ "initg_pty": {
+ "nm": "April Blake Grant",
+ "id": {
+ "org_id": {
+ "othr": []
+ },
+ "prvt_id": {
+ "dt_and_plc_of_birth": {
+ "birth_dt": "1968-02-01",
+ "city_of_birth": "Unknown",
+ "ctry_of_birth": "ZZ"
+ },
+ "othr": [
+ {
+ "id": "+27730975224",
+ "schme_nm": {
+ "prtry": "MSISDN",
+ "cd": "cd-value"
+ }
+ }
+ ]
+ }
+ },
+ "ctct_dtls": {
+ "mob_nb": "+27-730975224",
+ "othr": []
+ }
+ },
+ "dbtr": {
+ "nm": "April Blake Grant",
+ "id": {
+ "org_id": {
+ "othr": []
+ },
+ "prvt_id": {
+ "dt_and_plc_of_birth": {
+ "birth_dt": "2000-07-23",
+ "city_of_birth": "Unknown",
+ "ctry_of_birth": "ZZ"
+ },
+ "othr": [
+ {
+ "id": generate_id(),
+ "schme_nm": {
+ "prtry": "EID",
+ "cd": "cd-value"
+ }
+ }
+ ]
+ }
+ },
+ "ctct_dtls": {
+ "mob_nb": "+27-730975224",
+ "othr": []
+ }
+ },
+ "dbtr_acct": {
+ "id": {
+ "i_b_a_n": "value",
+ "othr": {
+ "id": generate_id(),
+ "schme_nm": {
+ "prtry": "MSISDN",
+ "cd": "value"
+ }
+ }
+ },
+ "nm": "April Grant"
+ },
+ "dbtr_agt": {
+ "fin_instn_id": {
+ "clr_sys_mmb_id": {
+ "mmb_id": debtor_fsp,
+ }
+ }
+ },
+ "cdtr_agt": {
+ "fin_instn_id": {
+ "clr_sys_mmb_id": {
+ "mmb_id": creditor_fsp,
+ }
+ }
+ },
+ "cdtr": {
+ "nm": "Felicia Easton Quill",
+ "id": {
+ "org_id": {
+ "othr": []
+ },
+ "prvt_id": {
+ "dt_and_plc_of_birth": {
+ "birth_dt": "1935-05-08",
+ "city_of_birth": "Unknown",
+ "ctry_of_birth": "ZZ"
+ },
+ "othr": [
+ {
+ "id": generate_id(),
+ "schme_nm": {
+ "prtry": "EID",
+ "cd": ""
+ }
+ }
+ ]
+ }
+ },
+ "ctct_dtls": {
+ "mob_nb": "+27-707650428",
+ "othr": []
+ }
+ },
+ "cdtr_acct": {
+ "id": {
+ "i_b_a_n": "",
+ "othr": {
+ "id": generate_id(),
+ "schme_nm": {
+ "prtry": "MSISDN",
+ "cd": "acc"
+ }
+ }
+ },
+ "nm": "Felicia Quill"
+ },
+ "purp": {
+ "cd": "MP2P",
+ "prtry": ""
+ },
+ "rgltry_rptg": [
+ {
+ "dtls": [
+ {
+ "tp": "BALANCE OF PAYMENTS",
+ "cd": "100",
+ "inf": []
+ }
+ ]
+ }
+ ],
+ "rmt_inf": {
+ "ustrd": [],
+ "strd": []
+ },
+ "splmtry_data": [
+ {
+ "envlp": {
+ "doc": {
+ "xprtn": "2021-11-30T10:38:56.000Z",
+ "initg_pty": {
+ "glctn": {
+ "lat": "-3.1609",
+ "long": "38.3588"
+ }
+ }
+ }
+ }
+ }
+ ],
+ "instr_for_cdtr_agt": [],
+ "instr_for_nxt_agt": [],
+ "rltd_rmt_inf": []
+ }
+ ]
+ }
+ });
+ let body = serde_json::to_vec(&v).unwrap();
+
+ let response = app
+ .clone()
+ .oneshot(
+ Request::builder()
+ .method("POST")
+ .header("Content-Type", "application/json")
+ .uri("/api/v99/pacs008")
+ .body(Body::from(body))
+ .unwrap(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(response.status(), StatusCode::NOT_FOUND);
+ }
+}