diff options
42 files changed, 2682 insertions, 285 deletions
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8183a72..c13f63e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,6 +12,53 @@ concurrency: name: ci jobs: + clippy: + runs-on: ubuntu-latest + permissions: + security-events: write # to upload sarif results + name: beta / clippy + steps: + - uses: actions/checkout@v5 + with: + submodules: true + - uses: dtolnay/rust-toolchain@master + with: + toolchain: beta + components: clippy,rustfmt + - uses: Swatinem/rust-cache@v2 + - run: cargo install clippy-sarif sarif-fmt + - name: install protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Run rust-clippy + run: + cargo clippy + --workspace + --all-features + --message-format=json | clippy-sarif | tee rust-clippy-results.sarif | sarif-fmt + continue-on-error: true + - name: Upload analysis results to GitHub + uses: github/codeql-action/upload-sarif@v3 + with: + sarif_file: rust-clippy-results.sarif + wait-for-processing: true + + + fmt: + runs-on: ubuntu-latest + name: stable / fmt + steps: + - uses: actions/checkout@v5 + with: + submodules: true + - name: Install stable + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + - name: cargo fmt --check + run: cargo fmt --check + os-check: runs-on: ${{ matrix.os }} name: ${{ matrix.os }} / stable diff --git a/.sqlx/query-32cb5438dd7289fd29e38dc545bf8ca74ef9cbcdc6f51642ef21db4444d1db15.json b/.sqlx/query-32cb5438dd7289fd29e38dc545bf8ca74ef9cbcdc6f51642ef21db4444d1db15.json new file mode 100644 index 0000000..3115b0d --- /dev/null +++ b/.sqlx/query-32cb5438dd7289fd29e38dc545bf8ca74ef9cbcdc6f51642ef21db4444d1db15.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "select count(*) from transaction_relationship tr\n where tr.destination = $1\n and tr.tx_tp = $2\n and extract(epoch from ($3::timestamptz - tr.cre_dt_tm)) * 1000 <= $4\n and tr.cre_dt_tm <= $3::timestamptz", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Timestamptz", + "Numeric" + ] + }, + "nullable": [ + null + ] + }, + "hash": "32cb5438dd7289fd29e38dc545bf8ca74ef9cbcdc6f51642ef21db4444d1db15" +} @@ -318,6 +318,19 @@ dependencies = [ ] [[package]] +name = "bigdecimal" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a22f228ab7a1b23027ccc6c350b72868017af7ea8356fbdf19f8d991c690013" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", +] + +[[package]] name = "bitflags" version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2463,6 +2476,7 @@ dependencies = [ "prost 0.14.1", "serde", "serde_json", + "sqlx", "time", "tokio", "tonic 0.14.1", @@ -2712,9 +2726,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.142" +version = "1.0.143" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "030fedb782600dcbd6f02d479bf0d817ac3bb40d644745b769d6a96bc3afc5a7" +checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a" dependencies = [ "itoa", "memchr", @@ -2933,6 +2947,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" dependencies = [ "base64", + "bigdecimal", "bytes", "crc", "crossbeam-queue", @@ -3010,6 +3025,7 @@ checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", "base64", + "bigdecimal", "bitflags", "byteorder", "bytes", @@ -3054,6 +3070,7 @@ checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", "base64", + "bigdecimal", "bitflags", "byteorder", "crc", @@ -3070,6 +3087,7 @@ dependencies = [ "log", "md-5", "memchr", + "num-bigint", "once_cell", "rand 0.8.5", "serde", diff --git a/contrib/docker-compose/compose-monitoring.yaml b/contrib/docker-compose/compose-monitoring.yaml index ef1d1d7..9982d9b 100644 --- a/contrib/docker-compose/compose-monitoring.yaml +++ b/contrib/docker-compose/compose-monitoring.yaml @@ -18,7 +18,7 @@ services: - warden grafana: - image: grafana/grafana:12.1.0 + image: grafana/grafana:12.1.1 environment: - GF_PATHS_PROVISIONING=/etc/grafana/provisioning - GF_AUTH_ANONYMOUS_ENABLED=true diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index b290f08..a77a215 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -54,6 +54,9 @@ redoc = ["dep:utoipa-redoc", "utoipa-redoc/axum"] rapidoc = ["dep:utoipa-rapidoc", "utoipa-rapidoc/axum"] scalar = ["dep:utoipa-scalar", "utoipa-scalar/axum"] +[dev-dependencies] +tower = { workspace = true, features = ["util"] } + [dependencies.warden-stack] workspace = true features = ["api", "cache", "nats-jetstream", "postgres", "opentelemetry-tonic", "tracing-loki"] diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs index e31fc60..e31a57b 100644 --- a/crates/configuration/src/server.rs +++ b/crates/configuration/src/server.rs @@ -16,6 +16,10 @@ use warden_core::{ mutate_rule_configuration_server::MutateRuleConfigurationServer, query_rule_configuration_server::QueryRuleConfigurationServer, }, + typology::{ + mutate_typologies_server::MutateTypologiesServer, + query_typologies_server::QueryTypologiesServer, + }, }, }; use warden_middleware::grpc::interceptor::MyInterceptor; @@ -44,6 +48,14 @@ pub fn serve(state: AppHandle) -> Result<(axum::Router, axum::Router), AppError> state.clone(), MyInterceptor, )) + .add_service(QueryTypologiesServer::with_interceptor( + state.clone(), + MyInterceptor, + )) + .add_service(MutateTypologiesServer::with_interceptor( + state.clone(), + MyInterceptor, + )) .add_service(routing_reflector) .into_axum_router() .layer( diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs index 423d67d..2997e13 100644 --- a/crates/configuration/src/server/http_svc.rs +++ b/crates/configuration/src/server/http_svc.rs @@ -63,3 +63,47 @@ pub fn build_router(state: AppHandle) -> Router { warden_middleware::apply(router) } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn health_check(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + let app = build_router(state); + + let response = app + .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs index 92f3184..64fc4c3 100644 --- a/crates/configuration/src/server/http_svc/routes.rs +++ b/crates/configuration/src/server/http_svc/routes.rs @@ -31,3 +31,17 @@ pub fn router(store: AppHandle) -> OpenApiRouter { )) .with_state(store) } + +#[cfg(test)] +pub(crate) fn test_config() -> warden_stack::Configuration { + use warden_stack::Configuration; + + let config_path = "warden-config.toml"; + + let config = config::Config::builder() + .add_source(config::File::new(config_path, config::FileFormat::Toml)) + .build() + .unwrap(); + + config.try_deserialize::<Configuration>().unwrap() +} diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs index 4875a80..6974c6d 100644 --- a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs +++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs @@ -37,3 +37,55 @@ pub async fn active_routing( .into_inner(); Ok(axum::Json(config.configuration).into_response()) } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn get_empty(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + let app = build_router(state); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .header("Content-Type", "application/json") + .uri("/api/v0/routing") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs b/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs index 3578b65..ce9ba37 100644 --- a/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs +++ b/crates/configuration/src/server/http_svc/routes/routing/post_routing.rs @@ -37,3 +37,102 @@ pub async fn post_routing( Ok((axum::http::StatusCode::CREATED, axum::Json(response))) } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn create_routing(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + let app = build_router(state); + + let routing = serde_json::json!({ + "active": true, + "name": "Public Network Map", + "version": "1.0.0", + "messages": [ + { + "id": "004", + "version": "1.0.0", + "tx_tp": "pacs.002.001.12", + "typologies": [ + { + "id": "999", + "version": "1.0.0", + "rules": [ + { + "id": "901", + "version": "1.0.0" + } + ] + } + ] + } + ] + }); + + let body = serde_json::to_vec(&routing).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/routing") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::CREATED); + + // should have an active one + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .header("Content-Type", "application/json") + .uri("/api/v0/routing") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + // let bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); + // let routing_info: RoutingConfiguration = serde_json::from_slice(&bytes).unwrap(); + // + } +} diff --git a/crates/configuration/src/server/http_svc/routes/rule.rs b/crates/configuration/src/server/http_svc/routes/rule.rs index f4b0d33..597693f 100644 --- a/crates/configuration/src/server/http_svc/routes/rule.rs +++ b/crates/configuration/src/server/http_svc/routes/rule.rs @@ -2,3 +2,185 @@ pub mod create; pub mod delete; pub mod get; pub mod update; + +#[cfg(test)] +mod tests { + use axum::{ + body::{self, Body}, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_core::configuration::rule::RuleConfiguration; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn all_operations(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let rule = serde_json::json!({ + "id": "901", + "version": "1.0.0", + "description": "Number of outgoing transactions - debtor", + "configuration": { + "parameters": { + "max_query_range": 86400000 + }, + "exit_conditions": [ + { + "sub_rule_ref": ".x00", + "reason": "Incoming transaction is unsuccessful" + } + ], + "bands": [ + { + "sub_rule_ref": ".01", + "upper_limit": 2, + "reason": "The debtor has performed one transaction to date" + }, + { + "sub_rule_ref": ".02", + "lower_limit": 2, + "upper_limit": 3, + "reason": "The debtor has performed two transactions to date" + }, + { + "sub_rule_ref": ".03", + "lower_limit": 3, + "reason": "The debtor has performed three or more transactions to date" + } + ] + } + }); + + let body = serde_json::to_vec(&rule).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/rule") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::CREATED); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .header("Content-Type", "application/json") + .uri("/api/v0/rule?id=901&version=1.0.0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body = body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + + let config: RuleConfiguration = serde_json::from_slice(&body).unwrap(); + + assert_eq!(&config.id, "901"); + assert_eq!(&config.version, "1.0.0"); + + let rule = serde_json::json!({ + "id": "902", + "version": "1.0.0", + "description": "Number of outgoing transactions - debtor", + "configuration": { + "parameters": { + "max_query_range": 86400000 + }, + "exit_conditions": [ + { + "sub_rule_ref": ".x00", + "reason": "Incoming transaction is unsuccessful" + } + ], + "bands": [] + } + }); + + let body = serde_json::to_vec(&rule).unwrap(); + + app.clone() + .oneshot( + Request::builder() + .method("PUT") + .header("Content-Type", "application/json") + .uri("/api/v0/rule?id=901&version=1.0.0") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .header("Content-Type", "application/json") + .uri("/api/v0/rule?id=902&version=1.0.0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body = body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + + let config: RuleConfiguration = serde_json::from_slice(&body).unwrap(); + + assert_eq!(&config.id, "902"); + assert!(&config.configuration.unwrap().bands.is_empty()); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("DELETE") + .header("Content-Type", "application/json") + .uri("/api/v0/rule?id=902&version=1.0.0") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/rule/create.rs b/crates/configuration/src/server/http_svc/routes/rule/create.rs index 809c00b..f7aba5b 100644 --- a/crates/configuration/src/server/http_svc/routes/rule/create.rs +++ b/crates/configuration/src/server/http_svc/routes/rule/create.rs @@ -36,3 +36,93 @@ pub async fn create_rule( .into_inner(); Ok((axum::http::StatusCode::CREATED, axum::Json(response))) } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn post_rule(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let rule = serde_json::json!({ + "id": "901", + "version": "1.0.0", + "description": "Number of outgoing transactions - debtor", + "configuration": { + "parameters": { + "max_query_range": 86400000 + }, + "exit_conditions": [ + { + "sub_rule_ref": ".x00", + "reason": "Incoming transaction is unsuccessful" + } + ], + "bands": [ + { + "sub_rule_ref": ".01", + "upper_limit": 2, + "reason": "The debtor has performed one transaction to date" + }, + { + "sub_rule_ref": ".02", + "lower_limit": 2, + "upper_limit": 3, + "reason": "The debtor has performed two transactions to date" + }, + { + "sub_rule_ref": ".03", + "lower_limit": 3, + "reason": "The debtor has performed three or more transactions to date" + } + ] + } + }); + + let body = serde_json::to_vec(&rule).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/rule") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::CREATED); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/rule/delete.rs b/crates/configuration/src/server/http_svc/routes/rule/delete.rs index 2352fba..0182e47 100644 --- a/crates/configuration/src/server/http_svc/routes/rule/delete.rs +++ b/crates/configuration/src/server/http_svc/routes/rule/delete.rs @@ -40,3 +40,105 @@ pub async fn delete_rule_config( Ok(axum::Json(body)) } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn delete_rule(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let rule = serde_json::json!({ + "id": "901", + "version": "1.0.0", + "description": "Number of outgoing transactions - debtor", + "configuration": { + "parameters": { + "max_query_range": 86400000 + }, + "exit_conditions": [ + { + "sub_rule_ref": ".x00", + "reason": "Incoming transaction is unsuccessful" + } + ], + "bands": [ + { + "sub_rule_ref": ".01", + "upper_limit": 2, + "reason": "The debtor has performed one transaction to date" + }, + { + "sub_rule_ref": ".02", + "lower_limit": 2, + "upper_limit": 3, + "reason": "The debtor has performed two transactions to date" + }, + { + "sub_rule_ref": ".03", + "lower_limit": 3, + "reason": "The debtor has performed three or more transactions to date" + } + ] + } + }); + + let body = serde_json::to_vec(&rule).unwrap(); + + app.clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/rule") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("DELETE") + .header("Content-Type", "application/json") + .uri("/api/v0/rule?id=901&version=1.0.0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/rule/get.rs b/crates/configuration/src/server/http_svc/routes/rule/get.rs index 935eefb..eccab62 100644 --- a/crates/configuration/src/server/http_svc/routes/rule/get.rs +++ b/crates/configuration/src/server/http_svc/routes/rule/get.rs @@ -40,3 +40,116 @@ pub async fn get_rule( Ok(axum::Json(response)) } + +#[cfg(test)] +mod tests { + use axum::{ + body::{self, Body}, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_core::configuration::rule::RuleConfiguration; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn get(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let rule = serde_json::json!({ + "id": "901", + "version": "1.0.0", + "description": "Number of outgoing transactions - debtor", + "configuration": { + "parameters": { + "max_query_range": 86400000 + }, + "exit_conditions": [ + { + "sub_rule_ref": ".x00", + "reason": "Incoming transaction is unsuccessful" + } + ], + "bands": [ + { + "sub_rule_ref": ".01", + "upper_limit": 2, + "reason": "The debtor has performed one transaction to date" + }, + { + "sub_rule_ref": ".02", + "lower_limit": 2, + "upper_limit": 3, + "reason": "The debtor has performed two transactions to date" + }, + { + "sub_rule_ref": ".03", + "lower_limit": 3, + "reason": "The debtor has performed three or more transactions to date" + } + ] + } + }); + + let body = serde_json::to_vec(&rule).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/rule") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::CREATED); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .header("Content-Type", "application/json") + .uri("/api/v0/rule?id=901&version=1.0.0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body = body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + + let config: RuleConfiguration = serde_json::from_slice(&body).unwrap(); + + assert_eq!(&config.id, "901"); + assert_eq!(&config.version, "1.0.0"); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/rule/update.rs b/crates/configuration/src/server/http_svc/routes/rule/update.rs index 7bf3fe0..2f61bcb 100644 --- a/crates/configuration/src/server/http_svc/routes/rule/update.rs +++ b/crates/configuration/src/server/http_svc/routes/rule/update.rs @@ -1,7 +1,8 @@ -use axum::extract::State; +use axum::extract::{Query, State}; use tonic::IntoRequest; use warden_core::configuration::rule::{ - RuleConfiguration, UpdateRuleRequest, mutate_rule_configuration_server::MutateRuleConfiguration, + RuleConfiguration, RuleConfigurationRequest, UpdateRuleRequest, + mutate_rule_configuration_server::MutateRuleConfiguration, }; use crate::{ @@ -15,6 +16,7 @@ use crate::{ path = "/{version}/rule", params( ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"), + RuleConfigurationRequest ), responses(( status = OK, @@ -22,18 +24,21 @@ use crate::{ )), operation_id = "update rule configuration", // https://github.com/juhaku/utoipa/issues/1170 tag = TAG_RULES, - ) +) ] #[axum::debug_handler] #[tracing::instrument(skip(state))] pub async fn update_rule_config( version: Version, + Query(params): Query<RuleConfigurationRequest>, State(state): State<AppHandle>, axum::Json(body): axum::Json<RuleConfiguration>, ) -> Result<axum::Json<RuleConfiguration>, AppError> { let config = state .update_rule_configuration( UpdateRuleRequest { + id: params.id, + version: params.version, configuration: Some(body), } .into_request(), @@ -43,3 +48,125 @@ pub async fn update_rule_config( Ok(axum::Json(config)) } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn update(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let rule = serde_json::json!({ + "id": "901", + "version": "1.0.0", + "description": "Number of outgoing transactions - debtor", + "configuration": { + "parameters": { + "max_query_range": 86400000 + }, + "exit_conditions": [ + { + "sub_rule_ref": ".x00", + "reason": "Incoming transaction is unsuccessful" + } + ], + "bands": [ + { + "sub_rule_ref": ".01", + "upper_limit": 2, + "reason": "The debtor has performed one transaction to date" + }, + { + "sub_rule_ref": ".02", + "lower_limit": 2, + "upper_limit": 3, + "reason": "The debtor has performed two transactions to date" + }, + { + "sub_rule_ref": ".03", + "lower_limit": 3, + "reason": "The debtor has performed three or more transactions to date" + } + ] + } + }); + + let body = serde_json::to_vec(&rule).unwrap(); + + app.clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/rule") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + let rule = serde_json::json!({ + "id": "902", + "version": "1.0.0", + "description": "Number of outgoing transactions - debtor", + "configuration": { + "parameters": { + "max_query_range": 86400000 + }, + "exit_conditions": [ + { + "sub_rule_ref": ".x00", + "reason": "Incoming transaction is unsuccessful" + } + ], + "bands": [] + } + }); + + let body = serde_json::to_vec(&rule).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("PUT") + .header("Content-Type", "application/json") + .uri("/api/v0/rule?id=901&version=1.0.0") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/typology.rs b/crates/configuration/src/server/http_svc/routes/typology.rs index 85a593b..c2fa270 100644 --- a/crates/configuration/src/server/http_svc/routes/typology.rs +++ b/crates/configuration/src/server/http_svc/routes/typology.rs @@ -2,3 +2,227 @@ pub mod create_typology; pub mod delete_typology; pub mod get_typology; pub mod post_typology; + +#[cfg(test)] +mod tests { + use axum::{ + body::{self, Body}, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_core::configuration::typology::TypologyConfiguration; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn all_operations(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let typology = serde_json::json!({ + "description": "Test description", + "typology_name": "Rule-901-Typology-999", + "id": "999", + "version": "1.0.0", + "workflow": { + "alert_threshold": 200, + "interdiction_threshold": 400 + }, + "rules": [ + { + "id": "901", + "version": "1.0.0", + "wghts": [ + { + "ref": ".err", + "wght": 0 + }, + { + "ref": ".x00", + "wght": 100 + }, + { + "ref": ".01", + "wght": 100 + }, + { + "ref": ".02", + "wght": 200 + }, + { + "ref": ".03", + "wght": 400 + } + ] + } + ], + "expression": { + "operator": "ADD", + "terms": [ + { + "id": "901", + "version": "1.0.0" + } + ] + } + }); + + let body = serde_json::to_vec(&typology).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/typology") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::CREATED); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .header("Content-Type", "application/json") + .uri("/api/v0/typology?id=999&version=1.0.0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body = body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + + let config: TypologyConfiguration = serde_json::from_slice(&body).unwrap(); + + assert_eq!(&config.id, "999"); + assert_eq!(&config.version, "1.0.0"); + + let typology = serde_json::json!({ + "description": "Test description", + "typology_name": "Rule-901-Typology-999", + "id": "901", + "version": "1.0.0", + "workflow": { + "alert_threshold": 200, + "interdiction_threshold": 400 + }, + "rules": [ + { + "id": "901", + "version": "1.0.0", + "wghts": [ + { + "ref": ".err", + "wght": 0 + }, + { + "ref": ".x00", + "wght": 100 + }, + { + "ref": ".01", + "wght": 100 + }, + { + "ref": ".02", + "wght": 200 + }, + { + "ref": ".03", + "wght": 400 + } + ] + } + ], + "expression": { + "operator": "ADD", + "terms": [ + { + "id": "901", + "version": "1.0.0" + } + ] + } + }); + + let body = serde_json::to_vec(&typology).unwrap(); + + app.clone() + .oneshot( + Request::builder() + .method("PUT") + .header("Content-Type", "application/json") + .uri("/api/v0/typology?id=999&version=1.0.0") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .header("Content-Type", "application/json") + .uri("/api/v0/typology?id=901&version=1.0.0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body = body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + + let config: TypologyConfiguration = serde_json::from_slice(&body).unwrap(); + + assert_eq!(&config.id, "901"); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("DELETE") + .header("Content-Type", "application/json") + .uri("/api/v0/typology?id=901&version=1.0.0") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs index 9f4985a..8aeeeec 100644 --- a/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs +++ b/crates/configuration/src/server/http_svc/routes/typology/create_typology.rs @@ -36,3 +36,106 @@ pub async fn create_typology( .into_inner(); Ok((axum::http::StatusCode::CREATED, axum::Json(response))) } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn post_typology(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let typology = serde_json::json!({ + "description": "Test description", + "typology_name": "Rule-901-Typology-999", + "id": "999", + "version": "1.0.0", + "workflow": { + "alert_threshold": 200, + "interdiction_threshold": 400 + }, + "rules": [ + { + "id": "901", + "version": "1.0.0", + "wghts": [ + { + "ref": ".err", + "wght": 0 + }, + { + "ref": ".x00", + "wght": 100 + }, + { + "ref": ".01", + "wght": 100 + }, + { + "ref": ".02", + "wght": 200 + }, + { + "ref": ".03", + "wght": 400 + } + ] + } + ], + "expression": { + "operator": "ADD", + "terms": [ + { + "id": "901", + "version": "1.0.0" + } + ] + } + }); + + let body = serde_json::to_vec(&typology).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/typology") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::CREATED); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs index 0e85e29..276aa67 100644 --- a/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs +++ b/crates/configuration/src/server/http_svc/routes/typology/delete_typology.rs @@ -39,3 +39,119 @@ pub async fn delete_typology( Ok(axum::Json(config)) } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn delete_typology(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let typology = serde_json::json!({ + "description": "Test description", + "typology_name": "Rule-901-Typology-999", + "id": "999", + "version": "1.0.0", + "workflow": { + "alert_threshold": 200, + "interdiction_threshold": 400 + }, + "rules": [ + { + "id": "901", + "version": "1.0.0", + "wghts": [ + { + "ref": ".err", + "wght": 0 + }, + { + "ref": ".x00", + "wght": 100 + }, + { + "ref": ".01", + "wght": 100 + }, + { + "ref": ".02", + "wght": 200 + }, + { + "ref": ".03", + "wght": 400 + } + ] + } + ], + "expression": { + "operator": "ADD", + "terms": [ + { + "id": "901", + "version": "1.0.0" + } + ] + } + + }); + + let body = serde_json::to_vec(&typology).unwrap(); + + app.clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/typology") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("DELETE") + .header("Content-Type", "application/json") + .uri("/api/v0/typology?id=999&version=1.0.0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs index 4962593..40d3faf 100644 --- a/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs +++ b/crates/configuration/src/server/http_svc/routes/typology/get_typology.rs @@ -38,3 +38,129 @@ pub async fn get_typology( Ok(axum::Json(config.configuration)) } + +#[cfg(test)] +mod tests { + use axum::{ + body::{self, Body}, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_core::configuration::typology::TypologyConfiguration; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn get(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let typology = serde_json::json!({ + "description": "Test description", + "typology_name": "Rule-901-Typology-999", + "id": "999", + "version": "1.0.0", + "workflow": { + "alert_threshold": 200, + "interdiction_threshold": 400 + }, + "rules": [ + { + "id": "901", + "version": "1.0.0", + "wghts": [ + { + "ref": ".err", + "wght": 0 + }, + { + "ref": ".x00", + "wght": 100 + }, + { + "ref": ".01", + "wght": 100 + }, + { + "ref": ".02", + "wght": 200 + }, + { + "ref": ".03", + "wght": 400 + } + ] + } + ], + "expression": { + "operator": "ADD", + "terms": [ + { + "id": "901", + "version": "1.0.0" + } + ] + } + }); + + let body = serde_json::to_vec(&typology).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/typology") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::CREATED); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .header("Content-Type", "application/json") + .uri("/api/v0/typology?id=999&version=1.0.0") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body = body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + + let config: TypologyConfiguration = serde_json::from_slice(&body).unwrap(); + + assert_eq!(&config.id, "999"); + assert_eq!(&config.version, "1.0.0"); + } +} diff --git a/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs b/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs index 2864372..0c432ab 100644 --- a/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs +++ b/crates/configuration/src/server/http_svc/routes/typology/post_typology.rs @@ -1,6 +1,7 @@ -use axum::extract::State; +use axum::extract::{Query, State}; use warden_core::configuration::typology::{ - TypologyConfiguration, UpdateTypologyConfigRequest, mutate_typologies_server::MutateTypologies, + TypologyConfiguration, TypologyConfigurationRequest, UpdateTypologyConfigRequest, + mutate_typologies_server::MutateTypologies, }; use crate::{ @@ -14,6 +15,7 @@ use crate::{ path = "/{version}/typology", params( ("version" = Version, Path, description = "API version, e.g., v1, v2, v3"), + TypologyConfigurationRequest, ), responses(( status = OK, @@ -26,14 +28,183 @@ use crate::{ #[axum::debug_handler] #[tracing::instrument(skip(state))] pub async fn update( + version: Version, + Query(params): Query<TypologyConfigurationRequest>, State(state): State<AppHandle>, axum::Json(body): axum::Json<TypologyConfiguration>, ) -> Result<axum::Json<TypologyConfiguration>, AppError> { let response = state .update_typology_configuration(tonic::Request::new(UpdateTypologyConfigRequest { + id: params.id, + version: params.version, configuration: Some(body), })) .await? .into_inner(); Ok(axum::Json(response)) } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::http_svc::{build_router, routes::test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn update(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + + let app = build_router(state); + + let typology = serde_json::json!({ + "description": "Test description", + "typology_name": "Rule-901-Typology-999", + "id": "999", + "version": "1.0.0", + "workflow": { + "alert_threshold": 200, + "interdiction_threshold": 400 + }, + "rules": [ + { + "id": "901", + "version": "1.0.0", + "wghts": [ + { + "ref": ".err", + "wght": 0 + }, + { + "ref": ".x00", + "wght": 100 + }, + { + "ref": ".01", + "wght": 100 + }, + { + "ref": ".02", + "wght": 200 + }, + { + "ref": ".03", + "wght": 400 + } + ] + } + ], + "expression": { + "operator": "ADD", + "terms": [ + { + "id": "901", + "version": "1.0.0" + } + ] + } + }); + + let body = serde_json::to_vec(&typology).unwrap(); + + app.clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/typology") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + let typology = serde_json::json!({ + "description": "Test description", + "typology_name": "Rule-901-Typology-999", + "id": "901", + "version": "1.0.0", + "workflow": { + "alert_threshold": 200, + "interdiction_threshold": 400 + }, + "rules": [ + { + "id": "901", + "version": "1.0.0", + "wghts": [ + { + "ref": ".err", + "wght": 0 + }, + { + "ref": ".x00", + "wght": 100 + }, + { + "ref": ".01", + "wght": 100 + }, + { + "ref": ".02", + "wght": 200 + }, + { + "ref": ".03", + "wght": 400 + } + ] + } + ], + "expression": { + "operator": "ADD", + "terms": [ + { + "id": "901", + "version": "1.0.0" + } + ] + } + }); + + let body = serde_json::to_vec(&typology).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("PUT") + .header("Content-Type", "application/json") + .uri("/api/v0/typology?id=999&version=1.0.0") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } +} diff --git a/crates/configuration/src/state/rule/mutate_rule.rs b/crates/configuration/src/state/rule/mutate_rule.rs index 9c4f393..10898b6 100644 --- a/crates/configuration/src/state/rule/mutate_rule.rs +++ b/crates/configuration/src/state/rule/mutate_rule.rs @@ -73,8 +73,8 @@ impl MutateRuleConfiguration for AppHandle { where id = $2 and version = $3 "#, sqlx::types::Json(&config) as _, - config.id, - config.id, + request.id, + request.version, ) .execute(&self.services.postgres) .instrument(span) diff --git a/crates/configuration/src/state/typology/mutate_typology.rs b/crates/configuration/src/state/typology/mutate_typology.rs index f2ab2cc..2673297 100644 --- a/crates/configuration/src/state/typology/mutate_typology.rs +++ b/crates/configuration/src/state/typology/mutate_typology.rs @@ -73,8 +73,8 @@ impl MutateTypologies for AppHandle { where id = $2 and version = $3 "#, sqlx::types::Json(&config) as _, - config.id, - config.version, + request.id, + request.version, ) .execute(&self.services.postgres) .instrument(span) diff --git a/crates/rule-executor/.env.example b/crates/rule-executor/.env.example new file mode 100644 index 0000000..ae98d94 --- /dev/null +++ b/crates/rule-executor/.env.example @@ -0,0 +1 @@ +DATABASE_URL="postgres://postgres:password@localhost:5432/database" diff --git a/crates/rule-executor/Cargo.toml b/crates/rule-executor/Cargo.toml index 3bb9561..614faf0 100644 --- a/crates/rule-executor/Cargo.toml +++ b/crates/rule-executor/Cargo.toml @@ -19,6 +19,14 @@ opentelemetry-semantic-conventions.workspace = true prost.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true +sqlx = { workspace = true, features = [ + "bigdecimal", + "macros", + "postgres", + "runtime-tokio", + "time", + "tls-rustls", +] } time = { workspace = true, features = ["serde"] } tokio = { workspace = true, features = [ "macros", diff --git a/crates/rule-executor/src/main.rs b/crates/rule-executor/src/main.rs index ed284c6..abae26d 100644 --- a/crates/rule-executor/src/main.rs +++ b/crates/rule-executor/src/main.rs @@ -1,4 +1,3 @@ -#[allow(dead_code)] mod cnfg; mod processor; @@ -47,6 +46,9 @@ async fn main() -> Result<()> { .nats_jetstream(&config.nats) .await .inspect_err(|e| error!("nats: {e}"))? + .postgres(&config.database) + .await + .inspect_err(|e| error!("postgres: {e}"))? .build(); let jetstream = services @@ -54,7 +56,15 @@ async fn main() -> Result<()> { .take() .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; - let services = state::Services { jetstream }; + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let services = state::Services { + jetstream, + postgres, + }; processor::serve(services, config, provider) .await diff --git a/crates/rule-executor/src/processor/publish.rs b/crates/rule-executor/src/processor/publish.rs index 8b13789..0d35977 100644 --- a/crates/rule-executor/src/processor/publish.rs +++ b/crates/rule-executor/src/processor/publish.rs @@ -1 +1,45 @@ +use warden_stack::tracing::telemetry::nats::injector; +use opentelemetry::global; +use opentelemetry_semantic_conventions::attribute; +use tracing::{Instrument, Span, debug, info_span, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::message::Payload; + +use crate::state::AppHandle; + +pub(super) async fn to_typologies( + subject: &str, + state: AppHandle, + payload: Payload, +) -> anyhow::Result<()> { + // send transaction to next with nats + let subject = format!("{}.{}", state.config.nats.destination_prefix, subject); + debug!(subject = ?subject, "publishing"); + + let payload = prost::Message::encode_to_vec(&payload); + + let mut headers = async_nats::HeaderMap::new(); + + let cx = Span::current().context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut injector::HeaderMap(&mut headers)) + }); + + let span = info_span!("nats.publish"); + span.set_attribute( + attribute::MESSAGING_DESTINATION_SUBSCRIPTION_NAME, + subject.to_string(), + ); + span.set_attribute("otel.kind", "producer"); + state + .services + .jetstream + .publish_with_headers(subject.clone(), headers, payload.into()) + .instrument(span) + .await + .inspect_err(|e| warn!(subject = ?subject, "failed to publish: {e}"))?; + + Ok(()) +} diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs index 3a54424..6eaf25c 100644 --- a/crates/rule-executor/src/processor/rule.rs +++ b/crates/rule-executor/src/processor/rule.rs @@ -2,15 +2,17 @@ use std::sync::Arc; use anyhow::Result; mod configuration; +mod determine_outcome; +mod rule_901; use async_nats::jetstream; use opentelemetry::global; -use tracing::{Span, error, instrument, warn}; +use tracing::{Span, debug, error, instrument, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; use warden_core::{configuration::rule::RuleConfigurationRequest, message::Payload}; use warden_stack::tracing::telemetry::nats; -use crate::state::AppHandle; +use crate::{processor::publish, state::AppHandle}; #[instrument( skip(message, state), @@ -27,7 +29,7 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu span.set_parent(context); }; - let payload: Payload = prost::Message::decode(message.payload.as_ref())?; + let mut payload: Payload = prost::Message::decode(message.payload.as_ref())?; if payload.transaction.is_none() { warn!("transaction is empty - proceeding with ack"); @@ -58,10 +60,23 @@ pub async fn process_rule(message: jetstream::Message, state: AppHandle) -> Resu span.record("rule_id", &req.id); span.record("rule_version", &req.version); - let _rule_configuration = configuration::get_configuration(req, Arc::clone(&state)) + let config = configuration::get_configuration(req, Arc::clone(&state)) .await .unwrap(); + match rule_901::process_901(&config, &payload, state.clone()).await { + Ok(res) => { + debug!(outcome = ?res.reason, "rule executed"); + payload.rule_result = Some(res); + publish::to_typologies(&config.id, state, payload) + .await + .inspect_err(|e| error!("{e}"))?; + } + Err(e) => { + error!("{e}"); + } + }; + if let Err(e) = message.ack().await { error!("ack error {e:?}"); }; diff --git a/crates/rule-executor/src/processor/rule/configuration.rs b/crates/rule-executor/src/processor/rule/configuration.rs index 5f384aa..d8579e6 100644 --- a/crates/rule-executor/src/processor/rule/configuration.rs +++ b/crates/rule-executor/src/processor/rule/configuration.rs @@ -37,10 +37,8 @@ pub(super) async fn get_configuration( .configuration .ok_or_else(|| anyhow!("missing configuration"))?; - println!("inserting"); let cache = state.local_cache.write().await; cache.insert(request, config.clone()).await; - println!("inserted"); Ok(config) } diff --git a/crates/rule-executor/src/processor/rule/determine_outcome.rs b/crates/rule-executor/src/processor/rule/determine_outcome.rs new file mode 100644 index 0000000..727846d --- /dev/null +++ b/crates/rule-executor/src/processor/rule/determine_outcome.rs @@ -0,0 +1,119 @@ +use tracing::trace; +use warden_core::{configuration::rule::Band, message::RuleResult}; + +pub(super) fn determine_outcome(value: i64, bands: &[Band], rule_result: &mut RuleResult) { + trace!("calculating outcome"); + for band in bands { + let value_f64 = value as f64; + + if band.lower_limit.is_none_or(|lower| value_f64 >= lower) + && band.upper_limit.is_none_or(|upper| value_f64 < upper) + { + rule_result.sub_rule_ref = band.sub_rule_ref.to_owned(); + rule_result.reason = band.reason.to_owned(); + break; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_band(lower: Option<f64>, upper: Option<f64>, sub_ref: &str, reason: &str) -> Band { + Band { + lower_limit: lower, + upper_limit: upper, + sub_rule_ref: sub_ref.to_string(), + reason: reason.to_string(), + } + } + + #[test] + fn matches_band_within_limits() { + let bands = vec![ + make_band(Some(0.0), Some(10.0), "A", "Between 0 and 10"), + make_band(Some(10.0), Some(20.0), "B", "Between 10 and 20"), + ]; + let mut rule_result = RuleResult::default(); + + determine_outcome(5, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Between 0 and 10"); + } + + #[test] + fn matches_band_lower_inclusive_upper_exclusive() { + let bands = vec![ + make_band(Some(0.0), Some(10.0), "A", "Between 0 and 10"), + make_band(Some(10.0), Some(20.0), "B", "Between 10 and 20"), + ]; + let mut rule_result = RuleResult::default(); + + determine_outcome(10, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "B"); + assert_eq!(rule_result.reason, "Between 10 and 20"); + } + + #[test] + fn no_match_when_above_all_bands() { + let bands = vec![ + make_band(Some(0.0), Some(10.0), "A", "Between 0 and 10"), + make_band(Some(10.0), Some(20.0), "B", "Between 10 and 20"), + ]; + let mut rule_result = RuleResult::default(); + + determine_outcome(30, &bands, &mut rule_result); + + assert_eq!(rule_result, RuleResult::default()); + } + + #[test] + fn match_when_no_upper_limit() { + let bands = vec![make_band(Some(0.0), None, "A", "Above 0")]; + let mut rule_result = RuleResult::default(); + + determine_outcome(100, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Above 0"); + } + + #[test] + fn match_when_no_lower_limit() { + let bands = vec![make_band(None, Some(50.0), "A", "Below 50")]; + let mut rule_result = RuleResult::default(); + + determine_outcome(-10, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Below 50"); + } + + #[test] + fn match_when_no_limits() { + let bands = vec![make_band(None, None, "A", "Any value")]; + let mut rule_result = RuleResult::default(); + + determine_outcome(9999, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Any value"); + } + + #[test] + fn stops_after_first_match() { + let bands = vec![ + make_band(None, None, "A", "Any value"), + make_band(None, None, "B", "Second band"), + ]; + let mut rule_result = RuleResult::default(); + + determine_outcome(5, &bands, &mut rule_result); + + assert_eq!(rule_result.sub_rule_ref, "A"); + assert_eq!(rule_result.reason, "Any value"); + } +} diff --git a/crates/rule-executor/src/processor/rule/rule_901.rs b/crates/rule-executor/src/processor/rule/rule_901.rs new file mode 100644 index 0000000..8dfe036 --- /dev/null +++ b/crates/rule-executor/src/processor/rule/rule_901.rs @@ -0,0 +1,126 @@ +use anyhow::{Result, anyhow}; +use determine_outcome::determine_outcome; +use opentelemetry_semantic_conventions::attribute; +use serde::Deserialize; +use sqlx::types::BigDecimal; +use time::OffsetDateTime; +use tracing::{Instrument, error, info_span, trace}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_core::{ + configuration::rule::RuleConfiguration, + iso20022::TransactionType, + message::{Payload, RuleResult}, +}; + +use crate::{processor::rule::determine_outcome, state::AppHandle}; + +#[derive(Deserialize)] +pub struct Parameters { + max_query_range: f64, +} + +pub(super) async fn process_901( + configuration: &RuleConfiguration, + payload: &Payload, + state: AppHandle, +) -> Result<RuleResult> { + let mut rule_result = RuleResult { + id: configuration.id.to_string(), + version: configuration.version.to_string(), + ..Default::default() + }; + let c = configuration.configuration.as_ref(); + + let bands = c + .and_then(|value| { + if value.bands.is_empty() { + None + } else { + Some(&value.bands) + } + }) + .ok_or_else(|| anyhow!("no bands available"))?; + + let exit_conditions = c + .and_then(|value| { + if value.exit_conditions.is_empty() { + None + } else { + Some(&value.exit_conditions) + } + }) + .ok_or_else(|| anyhow!("no exit conditions available"))?; + + let parameters = c + .and_then(|value| value.parameters.as_ref()) + .ok_or_else(|| anyhow!("no parameters available"))?; + + let params: Parameters = serde_json::from_value(parameters.clone().into()) + .inspect_err(|e| error!("failed to deserailise params: {e:?}"))?; + + let unsuccessful_transaction = exit_conditions + .iter() + .find(|value| value.sub_rule_ref.eq(".x00")); + + if let Some(warden_core::message::payload::Transaction::Pacs002(pacs002_document)) = + payload.transaction.as_ref() + { + let tx_sts = pacs002_document + .f_i_to_f_i_pmt_sts_rpt + .tx_inf_and_sts + .first() + .ok_or_else(|| anyhow::anyhow!("tx sts to be there"))?; + + if tx_sts.tx_sts().ne("ACCC") { + let unsuccessful_transaction = unsuccessful_transaction + .ok_or_else(|| anyhow::anyhow!("no unsuccessful transaction ref"))?; + rule_result.reason = unsuccessful_transaction.reason.to_owned(); + rule_result.sub_rule_ref = unsuccessful_transaction.sub_rule_ref.to_owned(); + + return Ok(rule_result); + } + + let current_pacs002_timeframe: OffsetDateTime = pacs002_document + .f_i_to_f_i_pmt_sts_rpt + .grp_hdr + .cre_dt_tm + .try_into()?; + + let data_cache = payload + .data_cache + .as_ref() + .ok_or_else(|| anyhow::anyhow!("data cache is missing"))?; + + let range = BigDecimal::try_from(params.max_query_range)?; + + let tx_tp = TransactionType::PACS002.to_string(); + + let span = info_span!("rule.logic"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "901"); + span.set_attribute("otel.kind", "client"); + + trace!("executing rule query"); + let recent_transactions = sqlx::query_scalar!( + "select count(*) from transaction_relationship tr + where tr.destination = $1 + and tr.tx_tp = $2 + and extract(epoch from ($3::timestamptz - tr.cre_dt_tm)) * 1000 <= $4 + and tr.cre_dt_tm <= $3::timestamptz", + data_cache.dbtr_acct_id, + tx_tp, + current_pacs002_timeframe, + range, + ) + .fetch_one(&state.services.postgres) + .instrument(span) + .await? + .ok_or_else(|| anyhow::anyhow!("no data"))?; + + determine_outcome(recent_transactions, bands.as_ref(), &mut rule_result); + + Ok(rule_result) + } else { + Err(anyhow::anyhow!("no valid transaction")) + } +} diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs index efad4ea..432068e 100644 --- a/crates/rule-executor/src/state.rs +++ b/crates/rule-executor/src/state.rs @@ -9,7 +9,7 @@ use warden_core::configuration::rule::{ RuleConfiguration, RuleConfigurationRequest, query_rule_configuration_client::QueryRuleConfigurationClient, }; -use warden_stack::Configuration; +use warden_stack::{Configuration, sqlx::PgPool}; use crate::cnfg::LocalConfig; use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; @@ -17,6 +17,7 @@ use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor}; #[derive(Clone)] pub struct Services { pub jetstream: Context, + pub postgres: PgPool, } pub type AppHandle = Arc<AppState>; diff --git a/crates/typologies/src/main.rs b/crates/typologies/src/main.rs index ea7843a..e96f6bb 100644 --- a/crates/typologies/src/main.rs +++ b/crates/typologies/src/main.rs @@ -45,6 +45,9 @@ async fn main() -> Result<()> { .nats_jetstream(&config.nats) .await .inspect_err(|e| error!("nats: {e}"))? + .cache(&config.cache) + .await + .inspect_err(|e| error!("cache: {e}"))? .build(); let jetstream = services diff --git a/crates/typologies/src/processor/typology.rs b/crates/typologies/src/processor/typology.rs index b1b2592..62e7089 100644 --- a/crates/typologies/src/processor/typology.rs +++ b/crates/typologies/src/processor/typology.rs @@ -83,7 +83,7 @@ pub async fn process_typology( Ok(()) } -#[instrument(skip(typology_result, routing, payload, state), err(Debug))] +#[instrument(skip(routing, payload, state), err(Debug))] async fn evaluate_typology( typology_result: &mut [TypologyResult], routing: &RoutingConfiguration, diff --git a/crates/warden/src/server.rs b/crates/warden/src/server.rs index 760138d..3dd170b 100644 --- a/crates/warden/src/server.rs +++ b/crates/warden/src/server.rs @@ -68,3 +68,233 @@ pub(crate) fn test_config() -> warden_stack::Configuration { config.try_deserialize::<Configuration>().unwrap() } + +#[cfg(test)] +pub(crate) fn generate_id() -> String { + let id = uuid::Uuid::new_v4().to_string(); + id.replace("-", "") +} + +#[cfg(test)] +pub(crate) fn test_pacs008() -> warden_core::iso20022::pacs008::Pacs008Document { + let msg_id = generate_id(); + let cre_dt_tm = time::OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .unwrap(); + let end_to_end_id = generate_id(); + + let debtor_fsp = "fsp001"; + let creditor_fsp = "fsp002"; + + let ccy = "XTS"; + + let v = serde_json::json!({ + "f_i_to_f_i_cstmr_cdt_trf": { + "grp_hdr": { + "msg_id": msg_id, + "cre_dt_tm": cre_dt_tm, + "nb_of_txs": "CLRG", + "sttlm_inf": { + "sttlm_mtd": 1 + } + }, + "splmtry_data": [], + "cdt_trf_tx_inf": [ + { + "pmt_id": { + "instr_id": generate_id(), + "end_to_end_id": end_to_end_id + }, + "intr_bk_sttlm_amt": { + "value": 294.3, + "ccy": ccy, + }, + "instd_amt": { + "value": 294.3, + "ccy": ccy + }, + "xchg_rate": 1, + "chrg_br": 1, + "chrgs_inf": [ + { + "amt": { + "value": 0, + "ccy": ccy + }, + "agt": { + "fin_instn_id": { + "clr_sys_mmb_id": { + "mmb_id": debtor_fsp, + } + } + } + } + ], + "initg_pty": { + "nm": "April Blake Grant", + "id": { + "org_id": { + "othr": [] + }, + "prvt_id": { + "dt_and_plc_of_birth": { + "birth_dt": "1968-02-01", + "city_of_birth": "Unknown", + "ctry_of_birth": "ZZ" + }, + "othr": [ + { + "id": "+27730975224", + "schme_nm": { + "prtry": "MSISDN", + "cd": "cd-value" + } + } + ] + } + }, + "ctct_dtls": { + "mob_nb": "+27-730975224", + "othr": [] + } + }, + "dbtr": { + "nm": "April Blake Grant", + "id": { + "org_id": { + "othr": [] + }, + "prvt_id": { + "dt_and_plc_of_birth": { + "birth_dt": "2000-07-23", + "city_of_birth": "Unknown", + "ctry_of_birth": "ZZ" + }, + "othr": [ + { + "id": generate_id(), + "schme_nm": { + "prtry": "EID", + "cd": "cd-value" + } + } + ] + } + }, + "ctct_dtls": { + "mob_nb": "+27-730975224", + "othr": [] + } + }, + "dbtr_acct": { + "id": { + "i_b_a_n": "value", + "othr": { + "id": generate_id(), + "schme_nm": { + "prtry": "MSISDN", + "cd": "value" + } + } + }, + "nm": "April Grant" + }, + "dbtr_agt": { + "fin_instn_id": { + "clr_sys_mmb_id": { + "mmb_id": debtor_fsp, + } + } + }, + "cdtr_agt": { + "fin_instn_id": { + "clr_sys_mmb_id": { + "mmb_id": creditor_fsp, + } + } + }, + "cdtr": { + "nm": "Felicia Easton Quill", + "id": { + "org_id": { + "othr": [] + }, + "prvt_id": { + "dt_and_plc_of_birth": { + "birth_dt": "1935-05-08", + "city_of_birth": "Unknown", + "ctry_of_birth": "ZZ" + }, + "othr": [ + { + "id": generate_id(), + "schme_nm": { + "prtry": "EID", + "cd": "" + } + } + ] + } + }, + "ctct_dtls": { + "mob_nb": "+27-707650428", + "othr": [] + } + }, + "cdtr_acct": { + "id": { + "i_b_a_n": "", + "othr": { + "id": generate_id(), + "schme_nm": { + "prtry": "MSISDN", + "cd": "acc" + } + } + }, + "nm": "Felicia Quill" + }, + "purp": { + "cd": "MP2P", + "prtry": "" + }, + "rgltry_rptg": [ + { + "dtls": [ + { + "tp": "BALANCE OF PAYMENTS", + "cd": "100", + "inf": [] + } + ] + } + ], + "rmt_inf": { + "ustrd": [], + "strd": [] + }, + "splmtry_data": [ + { + "envlp": { + "doc": { + "xprtn": "2021-11-30T10:38:56.000Z", + "initg_pty": { + "glctn": { + "lat": "-3.1609", + "long": "38.3588" + } + } + } + } + } + ], + "instr_for_cdtr_agt": [], + "instr_for_nxt_agt": [], + "rltd_rmt_inf": [] + } + ] + } + }); + + serde_json::from_value(v).unwrap() +} diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs index 8d94ca9..af3f06f 100644 --- a/crates/warden/src/server/routes/processor/pacs008.rs +++ b/crates/warden/src/server/routes/processor/pacs008.rs @@ -62,22 +62,22 @@ pub(super) async fn post_pacs008( ); } - // take the first + // take the first - guaranteed by utoipa trace!("extracting first credit transfer transaction info"); - let cdt_trf_tx_inf = transaction.f_i_to_f_i_cstmr_cdt_trf.cdt_trf_tx_inf.first(); - - let amount = cdt_trf_tx_inf.and_then(|value| value.instd_amt.as_ref().map(|value| value.value)); + let cdt_trf_tx_inf = transaction + .f_i_to_f_i_cstmr_cdt_trf + .cdt_trf_tx_inf + .first() + .expect("required cdt_trf_tx_inf missing"); - let ccy = - cdt_trf_tx_inf.and_then(|value| value.instd_amt.as_ref().map(|value| value.ccy.as_str())); + let amount = cdt_trf_tx_inf.instd_amt.as_ref().map(|value| value.value); - let end_to_end_id = cdt_trf_tx_inf + let ccy = cdt_trf_tx_inf + .instd_amt .as_ref() - .map(|value| value.pmt_id.end_to_end_id.as_str()) - .ok_or_else(|| { - error!("missing end_to_end_id"); - anyhow::anyhow!("missing end_to_end_id id") - })?; + .map(|value| value.ccy.as_str()); + + let end_to_end_id = cdt_trf_tx_inf.pmt_id.end_to_end_id.as_str(); tracing::Span::current().record("end_to_end_id", end_to_end_id); let end_to_end_id = String::from(end_to_end_id); @@ -85,12 +85,10 @@ pub(super) async fn post_pacs008( let msg_id = &transaction.f_i_to_f_i_cstmr_cdt_trf.grp_hdr.msg_id; tracing::Span::current().record("msg_id", msg_id); - let pmt_inf_id = cdt_trf_tx_inf - .and_then(|value| value.pmt_id.instr_id.as_ref()) - .ok_or_else(|| { - error!("missing pmt_inf_id"); - anyhow::anyhow!("missing pmt_inf_id id") - })?; + let pmt_inf_id = cdt_trf_tx_inf.pmt_id.instr_id.as_ref().ok_or_else(|| { + error!("missing pmt_inf_id"); + anyhow::anyhow!("missing pmt_inf_id id") + })?; debug!(%msg_id, %end_to_end_id, "extracted transaction identifiers"); @@ -324,11 +322,10 @@ mod tests { use sqlx::PgPool; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use tower::ServiceExt; - use uuid::Uuid; use warden_stack::cache::RedisManager; use crate::{ - server::{self, test_config}, + server::{self, generate_id, metrics_app, test_config}, state::{AppState, Services}, }; @@ -350,226 +347,31 @@ mod tests { ) .await .unwrap(); - let app = server::router(state); - - let ccy = "XTS"; + let app = server::router(state).merge(metrics_app()); - let msg_id = generate_id(); - let cre_dt_tm = OffsetDateTime::now_utc().format(&Rfc3339).unwrap(); + let pacs = server::test_pacs008(); - let debtor_fsp = "fsp001"; - let creditor_fsp = "fsp002"; + let inf = &pacs.f_i_to_f_i_cstmr_cdt_trf.cdt_trf_tx_inf[0]; + let ccy = &inf.intr_bk_sttlm_amt.as_ref().unwrap().ccy; + let end_to_end_id = &inf.pmt_id.end_to_end_id; + let debtor_fsp = &inf.chrgs_inf[0] + .agt + .fin_instn_id + .clr_sys_mmb_id + .as_ref() + .unwrap() + .mmb_id; + let creditor_fsp = &inf + .cdtr_agt + .as_ref() + .unwrap() + .fin_instn_id + .clr_sys_mmb_id + .as_ref() + .unwrap() + .mmb_id; - let end_to_end_id = generate_id(); - - let v = serde_json::json!({ - "f_i_to_f_i_cstmr_cdt_trf": { - "grp_hdr": { - "msg_id": msg_id, - "cre_dt_tm": cre_dt_tm, - "nb_of_txs": "CLRG", - "sttlm_inf": { - "sttlm_mtd": 1 - } - }, - "splmtry_data": [], - "cdt_trf_tx_inf": [ - { - "pmt_id": { - "instr_id": generate_id(), - "end_to_end_id": end_to_end_id - }, - "intr_bk_sttlm_amt": { - "value": 294.3, - "ccy": ccy, - }, - "instd_amt": { - "value": 294.3, - "ccy": ccy - }, - "xchg_rate": 1, - "chrg_br": 1, - "chrgs_inf": [ - { - "amt": { - "value": 0, - "ccy": ccy - }, - "agt": { - "fin_instn_id": { - "clr_sys_mmb_id": { - "mmb_id": debtor_fsp, - } - } - } - } - ], - "initg_pty": { - "nm": "April Blake Grant", - "id": { - "org_id": { - "othr": [] - }, - "prvt_id": { - "dt_and_plc_of_birth": { - "birth_dt": "1968-02-01", - "city_of_birth": "Unknown", - "ctry_of_birth": "ZZ" - }, - "othr": [ - { - "id": "+27730975224", - "schme_nm": { - "prtry": "MSISDN", - "cd": "cd-value" - } - } - ] - } - }, - "ctct_dtls": { - "mob_nb": "+27-730975224", - "othr": [] - } - }, - "dbtr": { - "nm": "April Blake Grant", - "id": { - "org_id": { - "othr": [] - }, - "prvt_id": { - "dt_and_plc_of_birth": { - "birth_dt": "2000-07-23", - "city_of_birth": "Unknown", - "ctry_of_birth": "ZZ" - }, - "othr": [ - { - "id": generate_id(), - "schme_nm": { - "prtry": "EID", - "cd": "cd-value" - } - } - ] - } - }, - "ctct_dtls": { - "mob_nb": "+27-730975224", - "othr": [] - } - }, - "dbtr_acct": { - "id": { - "i_b_a_n": "value", - "othr": { - "id": generate_id(), - "schme_nm": { - "prtry": "MSISDN", - "cd": "value" - } - } - }, - "nm": "April Grant" - }, - "dbtr_agt": { - "fin_instn_id": { - "clr_sys_mmb_id": { - "mmb_id": debtor_fsp, - } - } - }, - "cdtr_agt": { - "fin_instn_id": { - "clr_sys_mmb_id": { - "mmb_id": creditor_fsp, - } - } - }, - "cdtr": { - "nm": "Felicia Easton Quill", - "id": { - "org_id": { - "othr": [] - }, - "prvt_id": { - "dt_and_plc_of_birth": { - "birth_dt": "1935-05-08", - "city_of_birth": "Unknown", - "ctry_of_birth": "ZZ" - }, - "othr": [ - { - "id": generate_id(), - "schme_nm": { - "prtry": "EID", - "cd": "" - } - } - ] - } - }, - "ctct_dtls": { - "mob_nb": "+27-707650428", - "othr": [] - } - }, - "cdtr_acct": { - "id": { - "i_b_a_n": "", - "othr": { - "id": generate_id(), - "schme_nm": { - "prtry": "MSISDN", - "cd": "acc" - } - } - }, - "nm": "Felicia Quill" - }, - "purp": { - "cd": "MP2P", - "prtry": "" - }, - "rgltry_rptg": [ - { - "dtls": [ - { - "tp": "BALANCE OF PAYMENTS", - "cd": "100", - "inf": [] - } - ] - } - ], - "rmt_inf": { - "ustrd": [], - "strd": [] - }, - "splmtry_data": [ - { - "envlp": { - "doc": { - "xprtn": "2021-11-30T10:38:56.000Z", - "initg_pty": { - "glctn": { - "lat": "-3.1609", - "long": "38.3588" - } - } - } - } - } - ], - "instr_for_cdtr_agt": [], - "instr_for_nxt_agt": [], - "rltd_rmt_inf": [] - } - ] - } - }); - let body = serde_json::to_vec(&v).unwrap(); + let body = serde_json::to_vec(&pacs).unwrap(); let response = app .clone() @@ -586,12 +388,93 @@ mod tests { assert_eq!(response.status(), StatusCode::CREATED); - post_clearance(app, &end_to_end_id, ccy, debtor_fsp, creditor_fsp).await; + post_clearance(app, end_to_end_id, ccy, debtor_fsp, creditor_fsp).await; + } + + #[sqlx::test] + async fn post_missing_e2e(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + let app = server::router(state); + // no end to end id + + let mut pacs = server::test_pacs008(); + pacs.f_i_to_f_i_cstmr_cdt_trf.cdt_trf_tx_inf = vec![]; + + let body = serde_json::to_vec(&pacs).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/pacs008") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); } - fn generate_id() -> String { - let id = Uuid::new_v4().to_string(); - id.replace("-", "") + #[sqlx::test] + async fn post_missing_pmt_id(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + let app = server::router(state); + // no end to end id + + let mut pacs = server::test_pacs008(); + pacs.f_i_to_f_i_cstmr_cdt_trf.cdt_trf_tx_inf[0] + .pmt_id + .instr_id = None; + + let body = serde_json::to_vec(&pacs).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v0/pacs008") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); } async fn post_clearance( diff --git a/crates/warden/src/version.rs b/crates/warden/src/version.rs index 4eb5677..1af6891 100644 --- a/crates/warden/src/version.rs +++ b/crates/warden/src/version.rs @@ -33,3 +33,275 @@ where } } } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use sqlx::PgPool; + use time::{OffsetDateTime, format_description::well_known::Rfc3339}; + use tower::ServiceExt; + use warden_stack::cache::RedisManager; + + use crate::{ + server::{self, generate_id, test_config}, + state::{AppState, Services}, + }; + + #[sqlx::test] + async fn invalid_version(pool: PgPool) { + let config = test_config(); + + let cache = RedisManager::new(&config.cache).await.unwrap(); + let client = async_nats::connect(&config.nats.hosts[0]).await.unwrap(); + let jetstream = async_nats::jetstream::new(client); + + let state = AppState::create( + Services { + postgres: pool, + cache, + jetstream, + }, + &test_config(), + ) + .await + .unwrap(); + let app = server::router(state); + + let ccy = "XTS"; + + let msg_id = generate_id(); + let cre_dt_tm = OffsetDateTime::now_utc().format(&Rfc3339).unwrap(); + + let debtor_fsp = "fsp001"; + let creditor_fsp = "fsp002"; + + let end_to_end_id = generate_id(); + + let v = serde_json::json!({ + "f_i_to_f_i_cstmr_cdt_trf": { + "grp_hdr": { + "msg_id": msg_id, + "cre_dt_tm": cre_dt_tm, + "nb_of_txs": "CLRG", + "sttlm_inf": { + "sttlm_mtd": 1 + } + }, + "splmtry_data": [], + "cdt_trf_tx_inf": [ + { + "pmt_id": { + "instr_id": generate_id(), + "end_to_end_id": end_to_end_id + }, + "intr_bk_sttlm_amt": { + "value": 294.3, + "ccy": ccy, + }, + "instd_amt": { + "value": 294.3, + "ccy": ccy + }, + "xchg_rate": 1, + "chrg_br": 1, + "chrgs_inf": [ + { + "amt": { + "value": 0, + "ccy": ccy + }, + "agt": { + "fin_instn_id": { + "clr_sys_mmb_id": { + "mmb_id": debtor_fsp, + } + } + } + } + ], + "initg_pty": { + "nm": "April Blake Grant", + "id": { + "org_id": { + "othr": [] + }, + "prvt_id": { + "dt_and_plc_of_birth": { + "birth_dt": "1968-02-01", + "city_of_birth": "Unknown", + "ctry_of_birth": "ZZ" + }, + "othr": [ + { + "id": "+27730975224", + "schme_nm": { + "prtry": "MSISDN", + "cd": "cd-value" + } + } + ] + } + }, + "ctct_dtls": { + "mob_nb": "+27-730975224", + "othr": [] + } + }, + "dbtr": { + "nm": "April Blake Grant", + "id": { + "org_id": { + "othr": [] + }, + "prvt_id": { + "dt_and_plc_of_birth": { + "birth_dt": "2000-07-23", + "city_of_birth": "Unknown", + "ctry_of_birth": "ZZ" + }, + "othr": [ + { + "id": generate_id(), + "schme_nm": { + "prtry": "EID", + "cd": "cd-value" + } + } + ] + } + }, + "ctct_dtls": { + "mob_nb": "+27-730975224", + "othr": [] + } + }, + "dbtr_acct": { + "id": { + "i_b_a_n": "value", + "othr": { + "id": generate_id(), + "schme_nm": { + "prtry": "MSISDN", + "cd": "value" + } + } + }, + "nm": "April Grant" + }, + "dbtr_agt": { + "fin_instn_id": { + "clr_sys_mmb_id": { + "mmb_id": debtor_fsp, + } + } + }, + "cdtr_agt": { + "fin_instn_id": { + "clr_sys_mmb_id": { + "mmb_id": creditor_fsp, + } + } + }, + "cdtr": { + "nm": "Felicia Easton Quill", + "id": { + "org_id": { + "othr": [] + }, + "prvt_id": { + "dt_and_plc_of_birth": { + "birth_dt": "1935-05-08", + "city_of_birth": "Unknown", + "ctry_of_birth": "ZZ" + }, + "othr": [ + { + "id": generate_id(), + "schme_nm": { + "prtry": "EID", + "cd": "" + } + } + ] + } + }, + "ctct_dtls": { + "mob_nb": "+27-707650428", + "othr": [] + } + }, + "cdtr_acct": { + "id": { + "i_b_a_n": "", + "othr": { + "id": generate_id(), + "schme_nm": { + "prtry": "MSISDN", + "cd": "acc" + } + } + }, + "nm": "Felicia Quill" + }, + "purp": { + "cd": "MP2P", + "prtry": "" + }, + "rgltry_rptg": [ + { + "dtls": [ + { + "tp": "BALANCE OF PAYMENTS", + "cd": "100", + "inf": [] + } + ] + } + ], + "rmt_inf": { + "ustrd": [], + "strd": [] + }, + "splmtry_data": [ + { + "envlp": { + "doc": { + "xprtn": "2021-11-30T10:38:56.000Z", + "initg_pty": { + "glctn": { + "lat": "-3.1609", + "long": "38.3588" + } + } + } + } + } + ], + "instr_for_cdtr_agt": [], + "instr_for_nxt_agt": [], + "rltd_rmt_inf": [] + } + ] + } + }); + let body = serde_json::to_vec(&v).unwrap(); + + let response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .header("Content-Type", "application/json") + .uri("/api/v99/pacs008") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } +} diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs index 9d8747f..0fa0722 100644 --- a/lib/warden-core/build.rs +++ b/lib/warden-core/build.rs @@ -166,4 +166,8 @@ fn add_openapi(config: tonic_prost_build::Builder) -> tonic_prost_build::Builder ".configuration.rule.DeleteRuleConfigurationRequest", "#[derive(utoipa::IntoParams)]", ) + .field_attribute( + ".iso20022.pacs008.FIToFICustomerCreditTransferV12.cdt_trf_tx_inf", + "#[schema(min_items = 1, max_items = 1)]", + ) } diff --git a/lib/warden-core/src/configuration/conv.rs b/lib/warden-core/src/configuration/conv.rs index 7f982b4..3b0fef9 100644 --- a/lib/warden-core/src/configuration/conv.rs +++ b/lib/warden-core/src/configuration/conv.rs @@ -266,7 +266,7 @@ mod tests { "x": 1, "y": [true, null, "str"], "z": { - "nested": 3.14 + "nested": 3.90 } }); diff --git a/lib/warden-core/src/google/parser/money.rs b/lib/warden-core/src/google/parser/money.rs index a703a4a..c12bfb1 100644 --- a/lib/warden-core/src/google/parser/money.rs +++ b/lib/warden-core/src/google/parser/money.rs @@ -172,7 +172,7 @@ mod tests { #[test] fn test_round_trip_conversion() { - let original = 1234.567_890_123; + let original = 1_234.567_890_123; let money = Money::try_from((original, "USD")).unwrap(); let back: f64 = money.into(); assert!( diff --git a/lib/warden-stack/src/tracing.rs b/lib/warden-stack/src/tracing.rs index 1a40f4b..001e502 100644 --- a/lib/warden-stack/src/tracing.rs +++ b/lib/warden-stack/src/tracing.rs @@ -46,21 +46,32 @@ impl<S: tracing_builder::IsComplete> TracingBuilder<S> { } } -// #[cfg(test)] -// mod tests { -// use super::*; -// -// #[test] -// fn build() { -// let builder = Tracing::builder().build(); -// let level = crate::Monitoring { -// log_level: "info".to_string(), -// #[cfg(feature = "opentelemetry")] -// opentelemetry_endpoint: "http://localhost:4317".into(), -// #[cfg(feature = "tracing-loki")] -// loki_endpoint: "http://localhost:3100".into(), -// }; -// builder.init(&level); -// builder.loki_task -// } -// } +#[cfg(test)] +mod tests { + use crate::{AppConfig, Environment, Monitoring}; + + use super::*; + + #[tokio::test] + async fn build() { + let config = Monitoring { + log_level: "error".to_string(), + opentelemetry_endpoint: "http://localhost:4317".into(), + loki_endpoint: "http://localhost:3100".into(), + }; + + let app_config = AppConfig { + name: "test".into(), + version: "1.0.0".into(), + env: Environment::Development, + port: 6969, + }; + + let tracing = Tracing::builder().opentelemetry(&app_config, &config); + assert!(tracing.is_ok()); + + let tracing = tracing.unwrap().loki(&app_config, &config); + + assert!(tracing.is_ok()); + } +} diff --git a/proto/configuration/rule.proto b/proto/configuration/rule.proto index f963fde..1d21e31 100644 --- a/proto/configuration/rule.proto +++ b/proto/configuration/rule.proto @@ -52,7 +52,9 @@ message DeleteRuleConfigurationRequest { } message UpdateRuleRequest { - RuleConfiguration configuration = 1; + string id = 1; + string version = 2; + RuleConfiguration configuration = 3; } message GetRuleConfigResponse { diff --git a/proto/configuration/typology.proto b/proto/configuration/typology.proto index bae4630..2fa26c3 100644 --- a/proto/configuration/typology.proto +++ b/proto/configuration/typology.proto @@ -59,7 +59,9 @@ message GetTypologyConfigResponse { } message UpdateTypologyConfigRequest { - TypologyConfiguration configuration = 1; + string id = 1; + string version = 2; + TypologyConfiguration configuration = 3; } service QueryTypologies { |