diff options
| author | rtkay123 <dev@kanjala.com> | 2026-04-01 09:05:33 +0200 |
|---|---|---|
| committer | rtkay123 <dev@kanjala.com> | 2026-04-01 09:05:33 +0200 |
| commit | daeb5311840680599a0ce6e49d181b9289010f68 (patch) | |
| tree | d0c9c040ca003a6d431781b867c4290cbe5c9ef2 | |
| parent | 2c336f0339747aa77a8fe6613b83200c8d4902a5 (diff) | |
| download | warden-daeb5311840680599a0ce6e49d181b9289010f68.tar.bz2 warden-daeb5311840680599a0ce6e49d181b9289010f68.zip | |
feat(schema): cursor pagination
35 files changed, 497 insertions, 145 deletions
diff --git a/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json b/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json index 0fa6500..71caaab 100644 --- a/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json +++ b/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json @@ -5,26 +5,31 @@ "columns": [ { "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, "name": "schema_type", "type_info": "Text" }, { - "ordinal": 1, + "ordinal": 2, "name": "schema_version", "type_info": "Varchar" }, { - "ordinal": 2, + "ordinal": 3, "name": "schema", "type_info": "Jsonb" }, { - "ordinal": 3, + "ordinal": 4, "name": "created_at", "type_info": "Timestamptz" }, { - "ordinal": 4, + "ordinal": 5, "name": "updated_at", "type_info": "Timestamptz" } @@ -41,6 +46,7 @@ false, false, false, + false, false ] }, diff --git a/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json b/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json index 0453107..a4918e2 100644 --- a/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json +++ b/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json @@ -5,26 +5,31 @@ "columns": [ { "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, "name": "schema_type", "type_info": "Text" }, { - "ordinal": 1, + "ordinal": 2, "name": "schema_version", "type_info": "Varchar" }, { - "ordinal": 2, + "ordinal": 3, "name": "schema", "type_info": "Jsonb" }, { - "ordinal": 3, + "ordinal": 4, "name": "created_at", "type_info": "Timestamptz" }, { - "ordinal": 4, + "ordinal": 5, "name": "updated_at", "type_info": "Timestamptz" } @@ -41,6 +46,7 @@ false, false, false, + false, false ] }, diff --git a/.sqlx/query-528f112b0ca941f4f1dbd61c36db7f8fa12d82bbde8f9e8abeb6f8961d18a971.json b/.sqlx/query-528f112b0ca941f4f1dbd61c36db7f8fa12d82bbde8f9e8abeb6f8961d18a971.json new file mode 100644 index 0000000..7d6d63d --- /dev/null +++ b/.sqlx/query-528f112b0ca941f4f1dbd61c36db7f8fa12d82bbde8f9e8abeb6f8961d18a971.json @@ -0,0 +1,55 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select * from (\n select * from transaction_schema\n where ($1::bigint is null or (id, schema_type, schema_version) < ($1, $2, $3))\n order by id desc, schema_type desc, schema_version desc\n limit $4\n ) sub\n order by id asc, schema_type asc, schema_version asc\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "schema_type", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "schema_version", + "type_info": "Varchar" + }, + { + "ordinal": 3, + "name": "schema", + "type_info": "Jsonb" + }, + { + "ordinal": 4, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "updated_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int8", + "Text", + "Text", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "528f112b0ca941f4f1dbd61c36db7f8fa12d82bbde8f9e8abeb6f8961d18a971" +} diff --git a/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json b/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json index 5e32b25..0c410f0 100644 --- a/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json +++ b/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json @@ -5,26 +5,31 @@ "columns": [ { "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, "name": "schema_type", "type_info": "Text" }, { - "ordinal": 1, + "ordinal": 2, "name": "schema_version", "type_info": "Varchar" }, { - "ordinal": 2, + "ordinal": 3, "name": "schema", "type_info": "Jsonb" }, { - "ordinal": 3, + "ordinal": 4, "name": "created_at", "type_info": "Timestamptz" }, { - "ordinal": 4, + "ordinal": 5, "name": "updated_at", "type_info": "Timestamptz" } @@ -40,6 +45,7 @@ false, false, false, + false, false ] }, diff --git a/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json b/.sqlx/query-bd0c565ebe2ea4b64d4c8940ac7162cf70134d0af9c85e2571d2727ffd1bc767.json index cee9aca..655e358 100644 --- a/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json +++ b/.sqlx/query-bd0c565ebe2ea4b64d4c8940ac7162cf70134d0af9c85e2571d2727ffd1bc767.json @@ -1,36 +1,42 @@ { "db_name": "PostgreSQL", - "query": "\n select *\n from transaction_schema\n where ($1 = '' or (schema_type, schema_version) > ($1, $2))\n order by schema_type asc, schema_version asc\n limit $3\n ", + "query": "\n select * from transaction_schema\n where ($1::bigint is null or (id, schema_type, schema_version) > ($1, $2, $3))\n order by id asc, schema_type asc, schema_version asc\n limit $4\n ", "describe": { "columns": [ { "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, "name": "schema_type", "type_info": "Text" }, { - "ordinal": 1, + "ordinal": 2, "name": "schema_version", "type_info": "Varchar" }, { - "ordinal": 2, + "ordinal": 3, "name": "schema", "type_info": "Jsonb" }, { - "ordinal": 3, + "ordinal": 4, "name": "created_at", "type_info": "Timestamptz" }, { - "ordinal": 4, + "ordinal": 5, "name": "updated_at", "type_info": "Timestamptz" } ], "parameters": { "Left": [ + "Int8", "Text", "Text", "Int8" @@ -41,8 +47,9 @@ false, false, false, + false, false ] }, - "hash": "48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f" + "hash": "bd0c565ebe2ea4b64d4c8940ac7162cf70134d0af9c85e2571d2727ffd1bc767" } @@ -99,6 +99,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "base64", "serde", "serde_json", "sqlx", @@ -3102,6 +3103,7 @@ dependencies = [ name = "warden-core" version = "0.1.0" dependencies = [ + "base64", "clap", "serde", "sqlx", @@ -3109,6 +3111,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "utoipa", ] [[package]] @@ -12,6 +12,7 @@ publish = false [workspace.dependencies] anyhow = "1.0.102" async-trait = "0.1.89" +base64 = "0.22.1" clap = "4.6.0" jsonschema = "0.45.0" secrecy = { version = "0.10.3", features = ["serde"] } diff --git a/lib/api-config/Cargo.toml b/lib/api-config/Cargo.toml index 6724306..e19fcb5 100644 --- a/lib/api-config/Cargo.toml +++ b/lib/api-config/Cargo.toml @@ -10,6 +10,7 @@ publish.workspace = true [dependencies] async-trait.workspace = true +base64.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/lib/api-config/src/error.rs b/lib/api-config/src/error.rs index b2dc9ad..a6c4991 100644 --- a/lib/api-config/src/error.rs +++ b/lib/api-config/src/error.rs @@ -10,4 +10,10 @@ pub enum ConfigurationError { InvalidHeader { expected: String, found: String }, #[error("unknown data store error")] Unknown, + #[error(transparent)] + Pagination(#[from] base64::DecodeError), + #[error(transparent)] + PaginationCursor(#[from] std::string::FromUtf8Error), + #[error(transparent)] + PaginationId(#[from] std::num::ParseIntError), } diff --git a/lib/api-config/src/lib.rs b/lib/api-config/src/lib.rs index 0d0117a..c2c6ccb 100644 --- a/lib/api-config/src/lib.rs +++ b/lib/api-config/src/lib.rs @@ -1,5 +1,11 @@ +//! Configuration +#![warn(missing_docs, missing_debug_implementations)] mod error; +/// Schema configuration implementation pub mod schema; +/// Errors returned by the configuration library pub use error::ConfigurationError; -pub(crate) static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("../../migrations"); +#[cfg(test)] +/// Database migrator +pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("../../migrations"); diff --git a/lib/api-config/src/schema/create_schema.rs b/lib/api-config/src/schema/create_schema.rs index 493fb09..cdbc91f 100644 --- a/lib/api-config/src/schema/create_schema.rs +++ b/lib/api-config/src/schema/create_schema.rs @@ -6,7 +6,7 @@ use crate::{ schema::{SchemaService, TransactionSchema}, }; -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Debug)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[cfg_attr(feature = "utoipa", schema(example = json!({ "schemaType": "custom.schema", diff --git a/lib/api-config/src/schema/implementation.rs b/lib/api-config/src/schema/implementation.rs index c414879..ca0757f 100644 --- a/lib/api-config/src/schema/implementation.rs +++ b/lib/api-config/src/schema/implementation.rs @@ -1,7 +1,10 @@ use async_trait::async_trait; use tracing::debug; +use warden_core::pagination::{Connection, PaginationArgs}; -use crate::schema::{self, SchemaDriver, SchemaService, TransactionSchema}; +use crate::schema::{ + self, SchemaDriver, SchemaService, TransactionSchema, pagination::DecodedSchemaPagination, +}; #[async_trait] impl SchemaDriver for SchemaService { @@ -44,41 +47,12 @@ impl SchemaDriver for SchemaService { } #[tracing::instrument(skip(self))] - async fn get_schemas( + async fn list_schemas( &self, + input: &PaginationArgs, limit: i64, - first: Option<i64>, - after: Option<&str>, - ) -> Result<Vec<TransactionSchema>, crate::ConfigurationError> { + ) -> Result<Connection<TransactionSchema>, crate::ConfigurationError> { debug!("getting transaction schemas"); - let limit = first.unwrap_or(limit); - let mut last_type = String::default(); - let mut last_version = String::default(); - - if let Some(s) = after { - let parts: Vec<&str> = s.split(',').collect(); - if parts.len() == 2 { - last_type = parts[0].to_string(); - last_version = parts[1].to_string(); - } - } - - let rows = sqlx::query_as!( - TransactionSchema, - " - select * - from transaction_schema - where ($1 = '' or (schema_type, schema_version) > ($1, $2)) - order by schema_type asc, schema_version asc - limit $3 - ", - &last_type, - &last_version, - limit + 1 - ) - .fetch_all(&self.database) - .await?; - - Ok(rows) + schema::list_schemas::list_schemas(self, input, limit).await } } diff --git a/lib/api-config/src/schema/list_schemas.rs b/lib/api-config/src/schema/list_schemas.rs new file mode 100644 index 0000000..0b539ed --- /dev/null +++ b/lib/api-config/src/schema/list_schemas.rs @@ -0,0 +1,213 @@ +use base64::{Engine, engine::general_purpose}; +use tracing::debug; +use warden_core::pagination::{Connection, Edge, PageInfo, PaginationArgs}; + +use crate::{ + ConfigurationError, + schema::{SchemaService, TransactionSchema, pagination::DecodedSchemaPagination}, +}; + +fn decode_cursors( + args: &PaginationArgs, + limit: i64, +) -> Result<DecodedSchemaPagination, ConfigurationError> { + let limit = args + .first + .or(args.last) + .map(|value| if value > limit { limit } else { value }) + .unwrap_or(limit); + + // Fetch limit + 1 to check if there's a next/prev page + let query_limit = limit + 1; + + let (id, type_cursor, ver_cursor) = + if let Some(cursor) = args.after.as_ref().or(args.before.as_ref()) { + let decoded = general_purpose::STANDARD.decode(cursor)?; + let s = String::from_utf8(decoded)?; + let parts: Vec<&str> = s.split(',').collect(); + ( + Some(parts[0].parse()?), + Some(parts[1].to_string()), + Some(parts[2].to_string()), + ) + } else { + (None, None, None) + }; + + Ok(DecodedSchemaPagination { + type_cursor, + version_cursor: ver_cursor, + limit: query_limit, + is_last: args.last.is_some(), + base_limit: limit, + id, + }) +} + +pub(super) async fn list_schemas( + state: &SchemaService, + args: &PaginationArgs, + limit: i64, +) -> Result<Connection<TransactionSchema>, ConfigurationError> { + debug!("listing schemas transaction schema"); + + let input = decode_cursors(args, limit)?; + + let rows = if input.is_last { + // BACKWARD PAGINATION + sqlx::query_as!( + TransactionSchema, + " + select * from ( + select * from transaction_schema + where ($1::bigint is null or (id, schema_type, schema_version) < ($1, $2, $3)) + order by id desc, schema_type desc, schema_version desc + limit $4 + ) sub + order by id asc, schema_type asc, schema_version asc + ", + input.id, + input.type_cursor, + input.version_cursor, + input.limit + ) + .fetch_all(&state.database) + .await? + } else { + // FORWARD PAGINATION (Default) + sqlx::query_as!( + TransactionSchema, + " + select * from transaction_schema + where ($1::bigint is null or (id, schema_type, schema_version) > ($1, $2, $3)) + order by id asc, schema_type asc, schema_version asc + limit $4 + ", + input.id, + input.type_cursor, + input.version_cursor, + input.limit + ) + .fetch_all(&state.database) + .await? + }; + + let res = encode_cursors(&rows, &input)?; + + Ok(res) +} + +fn encode_cursors( + rows: &[TransactionSchema], + args: &DecodedSchemaPagination, +) -> Result<Connection<TransactionSchema>, ConfigurationError> { + let has_extra = rows.len() > args.base_limit as usize; + let final_rows = if has_extra { + if args.is_last { + &rows[1..] + } else { + &rows[..args.base_limit as usize] + } + } else { + rows + }; + + let edges = final_rows + .iter() + .map(|row| { + let raw_cursor = format!("{},{},{}", row.id, row.schema_type, row.schema_version); + Edge { + node: row.clone(), + cursor: general_purpose::STANDARD.encode(raw_cursor), + } + }) + .collect::<Vec<_>>(); + + let page_info = PageInfo { + has_next_page: if args.is_last { false } else { has_extra }, + has_previous_page: if args.is_last { has_extra } else { false }, + start_cursor: edges.first().map(|e| e.cursor.clone()), + end_cursor: edges.last().map(|e| e.cursor.clone()), + }; + + Ok(Connection { edges, page_info }) +} + +#[cfg(test)] +mod tests { + use base64::{Engine, engine::general_purpose}; + use sqlx::PgPool; + use warden_core::pagination::PaginationArgs; + + use crate::schema::{SchemaDriver, SchemaService}; + + #[sqlx::test( + migrator = "crate::MIGRATOR", + fixtures(path = "../../tests/fixtures", scripts("schema")) + )] + async fn test_forward_pagination(pool: PgPool) -> anyhow::Result<()> { + let get_count = 2; + + // 1. Request first 2 + let args = PaginationArgs { + first: Some(get_count), + after: None, + last: None, + before: None, + }; + let limit = 10; + let driver = SchemaService { database: pool }; + + let res = driver.list_schemas(&args, limit).await?; + + assert_eq!(res.edges.len(), get_count as usize); + dbg!(&res); + assert_eq!(res.edges[0].node.schema_type, "payment"); + assert_eq!(res.edges[0].node.schema_version, "1.0.0"); + assert!(res.page_info.has_next_page); + + // 2. Request next 2 using the end_cursor + let args_next = PaginationArgs { + first: Some(2), + after: res.page_info.end_cursor, + last: None, + before: None, + }; + + let res_next = driver.list_schemas(&args_next, limit).await?; + + assert_eq!(res_next.edges.len(), 2); + assert_eq!(res_next.edges[0].node.schema_type, "payment"); + assert_eq!(res_next.edges[1].node.schema_type, "refund"); + Ok(()) + } + + #[sqlx::test( + migrator = "crate::MIGRATOR", + fixtures(path = "../../tests/fixtures", scripts("schema")) + )] + async fn test_backward_pagination(pool: PgPool) -> anyhow::Result<()> { + let get_count = 2; + + let cursor = general_purpose::STANDARD.encode("4,refund,1.2.0"); + + let args = PaginationArgs { + first: None, + after: None, + last: Some(get_count), + before: Some(cursor), + }; + + let limit = 10; + let driver = SchemaService { database: pool }; + + let res = driver.list_schemas(&args, limit).await?; + + assert_eq!(res.edges.len(), get_count as usize); + assert_eq!(res.edges[0].node.schema_type, "refund"); + assert_eq!(res.edges[0].node.schema_version, "1.0.0"); + assert_eq!(res.edges[1].node.schema_type, "payment"); + assert_eq!(res.edges[1].node.schema_version, "1.1.0"); + Ok(()) + } +} diff --git a/lib/api-config/src/schema/mod.rs b/lib/api-config/src/schema/mod.rs index d16ee9f..54bc015 100644 --- a/lib/api-config/src/schema/mod.rs +++ b/lib/api-config/src/schema/mod.rs @@ -2,6 +2,8 @@ mod create_schema; mod delete_schema; mod get_schema; mod implementation; +mod list_schemas; +mod pagination; mod update_schema; pub use create_schema::CreateSchema; use sqlx::PgPool; @@ -10,11 +12,13 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use time::OffsetDateTime; +use warden_core::pagination::Connection; +use warden_core::pagination::PaginationArgs; use crate::ConfigurationError; -/// Transaction to monitor -#[derive(Deserialize, Debug, Serialize)] +/// Transaction's schema +#[derive(Clone, Deserialize, Debug, Serialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[cfg_attr(feature = "utoipa", schema(example = json!({ "schemaType": "custom.schema", @@ -49,6 +53,9 @@ use crate::ConfigurationError; })))] #[serde(rename_all = "camelCase")] pub struct TransactionSchema { + /// Transaction schema id + #[serde(skip)] + pub id: i64, /// Transaction schema type pub schema_type: String, /// The schema's version @@ -63,8 +70,16 @@ pub struct TransactionSchema { pub updated_at: OffsetDateTime, } +/// The type implementing [SchemaDriver] +#[derive(Debug)] pub struct SchemaService { - pub database: PgPool, + pub(crate) database: PgPool, +} + +impl SchemaService { + pub fn new(database: PgPool) -> Self { + Self { database } + } } #[async_trait] @@ -91,12 +106,11 @@ pub trait SchemaDriver: Send + Sync { schema: &serde_json::Value, ) -> Result<Option<TransactionSchema>, ConfigurationError>; - async fn get_schemas( + async fn list_schemas( &self, + input: &PaginationArgs, limit: i64, - first: Option<i64>, - after: Option<&str>, - ) -> Result<Vec<TransactionSchema>, ConfigurationError>; + ) -> Result<Connection<TransactionSchema>, ConfigurationError>; } #[cfg(test)] @@ -110,7 +124,7 @@ mod tests { fixtures(path = "../../tests/fixtures", scripts("schema")) )] async fn schema(pool: PgPool) -> anyhow::Result<()> { - let driver = SchemaService { database: pool }; + let driver = SchemaService::new(pool); // 2. Define Fixtures let kind = "fin_tx_v1"; @@ -131,6 +145,10 @@ mod tests { .expect("Create failed"); assert_eq!(created.schema_type, kind); + // CREATE AGAIN SHOULD FAIL (CONFLICTING IDs) + let created = driver.create_schema(kind, version, &min_schema).await; + assert!(created.is_err()); + // GET let found = driver .get_schema("payment", "1.0.0") diff --git a/lib/api-config/src/schema/pagination.rs b/lib/api-config/src/schema/pagination.rs new file mode 100644 index 0000000..caaf726 --- /dev/null +++ b/lib/api-config/src/schema/pagination.rs @@ -0,0 +1,9 @@ +#[derive(Debug)] +pub struct DecodedSchemaPagination { + pub(crate) type_cursor: Option<String>, + pub(crate) version_cursor: Option<String>, + pub(crate) limit: i64, + pub(crate) base_limit: i64, + pub(crate) is_last: bool, + pub(crate) id: Option<i64>, +} diff --git a/lib/api-config/tests/fixtures/schema.sql b/lib/api-config/tests/fixtures/schema.sql index 4ec082e..f948735 100644 --- a/lib/api-config/tests/fixtures/schema.sql +++ b/lib/api-config/tests/fixtures/schema.sql @@ -1,12 +1,27 @@ insert into transaction_schema (schema_type, schema_version, schema) -values +values ( - 'payment', - '1.0.0', + 'payment', + '1.0.0', '{"type": "object", "required": ["amount", "currency"], "properties": {"amount": {"type": "number"}, "currency": {"type": "string"}}}' ), ( - 'refund', - '1.0.0', + 'refund', + '1.0.0', '{"type": "object", "required": ["original_txn_id"], "properties": {"original_txn_id": {"type": "string"}}}' +), +( + 'payment', + '1.1.0', + '{}' +), +( + 'refund', + '1.2.0', + '{}' +), +( + 'refund', + '1.3.0', + '{}' ); diff --git a/lib/warden-core/Cargo.toml b/lib/warden-core/Cargo.toml index 49b7186..c9b1aa7 100644 --- a/lib/warden-core/Cargo.toml +++ b/lib/warden-core/Cargo.toml @@ -9,12 +9,14 @@ homepage.workspace = true publish.workspace = true [dependencies] +base64.workspace = true clap = { workspace = true, features = ["derive", "env"] } serde = { workspace = true, features = ["derive"] } thiserror.workspace = true tracing.workspace = true tracing-subscriber = { version = "0.3.23", features = ["env-filter"] } url = { workspace = true, features = ["serde"] } +utoipa.workspace = true [dependencies.sqlx] workspace = true diff --git a/lib/warden-core/src/config/cli/database.rs b/lib/warden-core/src/config/cli/database.rs index 31ba930..70bf600 100644 --- a/lib/warden-core/src/config/cli/database.rs +++ b/lib/warden-core/src/config/cli/database.rs @@ -117,7 +117,7 @@ impl Database { } let host = "localhost".to_owned(); - let host = self.database_host.as_ref().unwrap_or_else(|| &host); + let host = self.database_host.as_ref().unwrap_or(&host); let mut url = Url::parse(&format!("postgres://{host}"))?; if let Some(ref u) = self.database_username { diff --git a/lib/warden-core/src/error.rs b/lib/warden-core/src/error.rs index d90a862..e347407 100644 --- a/lib/warden-core/src/error.rs +++ b/lib/warden-core/src/error.rs @@ -8,6 +8,10 @@ pub enum WardenError { Migration(#[from] sqlx::migrate::MigrateError), #[error(transparent)] Url(#[from] url::ParseError), + #[error(transparent)] + Pagination(#[from] base64::DecodeError), + #[error(transparent)] + PaginationCursor(#[from] std::string::FromUtf8Error), #[error("Missing required configuration values:\n`{0}`")] Config(String), #[error("invalid header (expected {expected:?}, found {found:?})")] diff --git a/lib/warden-core/src/lib.rs b/lib/warden-core/src/lib.rs index f200ba1..627659e 100644 --- a/lib/warden-core/src/lib.rs +++ b/lib/warden-core/src/lib.rs @@ -1,3 +1,4 @@ mod error; pub use error::WardenError; pub mod config; +pub mod pagination; diff --git a/lib/warden-core/src/pagination/mod.rs b/lib/warden-core/src/pagination/mod.rs new file mode 100644 index 0000000..25fb083 --- /dev/null +++ b/lib/warden-core/src/pagination/mod.rs @@ -0,0 +1,58 @@ +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +/// Arguments used for cursor-based pagination. +#[derive(Deserialize, Debug, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct PaginationArgs { + /// Returns the first `n` items from the list. + pub first: Option<i64>, + + /// A cursor pointing to the position after which items should be returned. + pub after: Option<String>, + + /// Returns the last `n` items from the list. + pub last: Option<i64>, + + /// A cursor pointing to the position before which items should be returned. + pub before: Option<String>, +} + +/// Metadata describing the current page of results. +#[derive(Serialize, Debug, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct PageInfo { + /// Indicates whether there are more items when paginating forward. + pub has_next_page: bool, + + /// Indicates whether there are more items when paginating backward. + pub has_previous_page: bool, + + /// The cursor corresponding to the first item in the current page. + pub start_cursor: Option<String>, + + /// The cursor corresponding to the last item in the current page. + pub end_cursor: Option<String>, +} + +/// A paginated connection containing edges and pagination metadata. +#[derive(Serialize, Debug, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct Connection<T> { + /// A list of edges, each containing a node and its cursor. + pub edges: Vec<Edge<T>>, + + /// Information about pagination for this connection. + pub page_info: PageInfo, +} + +/// An edge in a connection, representing a node and its cursor. +#[derive(Serialize, Debug, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct Edge<T> { + /// The item/node contained in this edge. + pub node: T, + + /// A cursor for this node, used in pagination. + pub cursor: String, +} diff --git a/migrations/20260329120645_transaction_schema.sql b/migrations/20260329120645_transaction_schema.sql index 8496c80..7868591 100644 --- a/migrations/20260329120645_transaction_schema.sql +++ b/migrations/20260329120645_transaction_schema.sql @@ -1,6 +1,7 @@ -- Add migration script here -- The transaction's blueprint to be checked on each request create table transaction_schema ( + id bigserial primary key, -- The transaction type schema_type text not null, -- The schema's version (to allow for multiple revisions - maybe) @@ -9,7 +10,7 @@ create table transaction_schema ( schema jsonb not null, created_at timestamptz not null default now(), updated_at timestamptz not null default now(), - primary key (schema_type, schema_version) + unique (schema_type, schema_version) ); create trigger update_transaction_schema_modtime diff --git a/warden/Cargo.toml b/warden/Cargo.toml index d4dcf8b..47cd8b6 100644 --- a/warden/Cargo.toml +++ b/warden/Cargo.toml @@ -9,7 +9,7 @@ homepage.workspace = true [dependencies] anyhow.workspace = true axum = { version = "0.8.8", features = ["macros"] } -base64 = "0.22.1" +base64.workspace = true clap.workspace = true jsonschema.workspace = true secrecy = { version = "0.10.3", features = ["serde"] } diff --git a/warden/src/main.rs b/warden/src/main.rs index e155dee..59068f5 100644 --- a/warden/src/main.rs +++ b/warden/src/main.rs @@ -37,9 +37,7 @@ async fn main() -> anyhow::Result<()> { let (log_handle, _guard) = logging::initialise_logging(&config.server.log_level, &config.server.log_dir); - let schema = SchemaService { - database: state::database::connect(&config.database).await?, - }; + let schema = SchemaService::new(state::database::connect(&config.database).await?); let schema = Arc::new(schema); let state = state::AppState::new(log_handle, schema).await?; diff --git a/warden/src/server/api/mod.rs b/warden/src/server/api/mod.rs index f5aa0c8..963a3ef 100644 --- a/warden/src/server/api/mod.rs +++ b/warden/src/server/api/mod.rs @@ -1,4 +1,3 @@ -pub mod pagination; pub mod transaction; pub mod version; diff --git a/warden/src/server/api/pagination.rs b/warden/src/server/api/pagination.rs deleted file mode 100644 index 7341d2f..0000000 --- a/warden/src/server/api/pagination.rs +++ /dev/null @@ -1,21 +0,0 @@ -use api_config::schema::TransactionSchema; -use serde::{Deserialize, Serialize}; -use utoipa::{IntoParams, ToSchema}; - -#[derive(Deserialize, Debug, IntoParams, ToSchema)] -pub struct PaginationParams { - pub first: Option<i64>, - pub after: Option<String>, // This is our Base64 cursor -} - -#[derive(Serialize, ToSchema)] -pub struct PageInfo { - pub has_next_page: bool, - pub end_cursor: Option<String>, -} - -#[derive(Serialize, ToSchema)] -pub struct RelayResponse { - pub nodes: Vec<TransactionSchema>, - pub page_info: PageInfo, -} diff --git a/warden/src/server/api/transaction.rs b/warden/src/server/api/transaction.rs index 03f8a1f..7db8528 100644 --- a/warden/src/server/api/transaction.rs +++ b/warden/src/server/api/transaction.rs @@ -4,6 +4,7 @@ use utoipa::ToSchema; #[derive(Deserialize, Serialize, ToSchema)] /// Transaction to monitor +#[serde(rename_all = "camelCase")] pub struct Transaction { /// Transaction schema type pub schema_type: String, diff --git a/warden/src/server/mod.rs b/warden/src/server/mod.rs index fe93352..92ac12d 100644 --- a/warden/src/server/mod.rs +++ b/warden/src/server/mod.rs @@ -18,7 +18,11 @@ use crate::{ config::Configuration, server::{ middleware::request_id::{REQUEST_ID_HEADER, middleware_request_id}, - routes::{ApiDoc, config::ConfigDoc, transaction_monitoring::MonitoringDoc}, + routes::{ + ApiDoc, + config::{ConfigDoc, schema::SchemaDoc}, + transaction_monitoring::MonitoringDoc, + }, }, state::AppState, }; @@ -32,6 +36,7 @@ pub async fn router(state: Arc<AppState>, config: &Configuration) -> Router<()> let mut doc = ApiDoc::openapi(); doc.merge(ConfigDoc::openapi()); + doc.merge(SchemaDoc::openapi()); doc.merge(MonitoringDoc::openapi()); let stubs = OpenApiRouter::with_openapi(doc) diff --git a/warden/src/server/routes/config/schema/create.rs b/warden/src/server/routes/config/schema/create.rs index aa22546..767ff3f 100644 --- a/warden/src/server/routes/config/schema/create.rs +++ b/warden/src/server/routes/config/schema/create.rs @@ -13,7 +13,7 @@ use crate::{ server::{ api::version::{Version, VersionPath}, error::AppError, - routes::config::CONFIG, + routes::config::schema::SCHEMA, }, state::AppState, }; @@ -60,7 +60,7 @@ use crate::{ ) ), operation_id = "create_schema", // https://github.com/juhaku/utoipa/issues/1170 - tag = CONFIG, + tag = SCHEMA, request_body( content = CreateSchema ), diff --git a/warden/src/server/routes/config/schema/delete.rs b/warden/src/server/routes/config/schema/delete.rs index 98fd581..55577ae 100644 --- a/warden/src/server/routes/config/schema/delete.rs +++ b/warden/src/server/routes/config/schema/delete.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use api_config::schema::SchemaDriver; use axum::{ debug_handler, extract::{Path, Query, State}, @@ -14,7 +13,7 @@ use crate::{ server::{ api::version::{Version, VersionPath}, error::AppError, - routes::config::CONFIG, + routes::config::schema::SCHEMA, }, state::AppState, }; @@ -65,7 +64,7 @@ pub struct SchemaDeleteQuery { ) ), operation_id = "delete_schema", // https://github.com/juhaku/utoipa/issues/1170 - tag = CONFIG, + tag = SCHEMA, path = "/{apiVersion}/config/schema", params(VersionPath, SchemaDeleteQuery), )] diff --git a/warden/src/server/routes/config/schema/mod.rs b/warden/src/server/routes/config/schema/mod.rs index 8c84f10..17db5ce 100644 --- a/warden/src/server/routes/config/schema/mod.rs +++ b/warden/src/server/routes/config/schema/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use utoipa::OpenApi; use utoipa_axum::router::OpenApiRouter; use crate::state::AppState; @@ -9,6 +10,12 @@ pub mod delete; pub mod read; pub mod update; +const SCHEMA: &str = "Schema"; + +#[derive(OpenApi)] +#[openapi(tags((name = SCHEMA, description = "JSON schemas that each monitoring request is validated against")))] +pub struct SchemaDoc; + pub fn router(store: Arc<AppState>) -> OpenApiRouter { OpenApiRouter::new() .routes(utoipa_axum::routes!(create::create_schema)) diff --git a/warden/src/server/routes/config/schema/read.rs b/warden/src/server/routes/config/schema/read.rs index c89583b..2d12935 100644 --- a/warden/src/server/routes/config/schema/read.rs +++ b/warden/src/server/routes/config/schema/read.rs @@ -1,23 +1,20 @@ use std::sync::Arc; -use api_config::schema::{SchemaDriver, TransactionSchema}; +use api_config::schema::TransactionSchema; use axum::{ Json, debug_handler, extract::{Path, Query, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, }; -use base64::{Engine, engine::general_purpose}; use tracing::debug; +use warden_core::pagination::{Connection, PaginationArgs}; use crate::{ server::{ - api::{ - pagination::{PageInfo, PaginationParams, RelayResponse}, - version::{Version, VersionPath}, - }, + api::version::{Version, VersionPath}, error::AppError, - routes::config::CONFIG, + routes::config::schema::SCHEMA, }, state::AppState, }; @@ -64,7 +61,7 @@ use crate::{ ) ), operation_id = "get_schema", // https://github.com/juhaku/utoipa/issues/1170 - tag = CONFIG, + tag = SCHEMA, path = "/{apiVersion}/config/schema/{schemaType}/{schemaVersion}", params( VersionPath, @@ -116,7 +113,7 @@ pub async fn get_schema( } } -/// Get transaction schemas +/// List transaction schemas #[utoipa::path( get, responses( @@ -126,7 +123,7 @@ pub async fn get_schema( headers( ("x-request-id" = Uuid, description = "Request identifier") ), - body = RelayResponse + body = Connection<TransactionSchema> ), ( status = 400, @@ -158,7 +155,7 @@ pub async fn get_schema( ) ), operation_id = "get_schemas", // https://github.com/juhaku/utoipa/issues/1170 - tag = CONFIG, + tag = SCHEMA, path = "/{apiVersion}/config/schema", params(VersionPath), )] @@ -175,22 +172,15 @@ pub async fn get_schemas( State(state): State<Arc<AppState>>, headers: HeaderMap, Path(version): Path<Version>, - params: Query<PaginationParams>, + params: Query<PaginationArgs>, ) -> Result<impl IntoResponse, AppError> { debug!("searching for schema"); - let mut cursor = None; let limit = 10; - // 1. Decode Cursor - if let Some(ref cursor_str) = params.after { - let decoded = general_purpose::STANDARD.decode(cursor_str).unwrap(); - cursor = Some(String::from_utf8(decoded).unwrap()); - } - // TODO: get from cache let rows = state .schema_service - .get_schemas(limit, params.first, cursor.as_deref()) + .list_schemas(¶ms, limit) .await .map_err(|e| match e { api_config::ConfigurationError::Database(ref error) => match error { @@ -205,20 +195,5 @@ pub async fn get_schemas( _ => e.into(), })?; - // 3. Process Relay Metadata - let has_next_page = rows.len() > limit as usize; - let nodes: Vec<TransactionSchema> = rows.into_iter().take(limit as usize).collect(); - - let end_cursor = nodes.last().map(|node| { - let raw_cursor = format!("{},{}", node.schema_type, node.schema_version); - general_purpose::STANDARD.encode(raw_cursor) - }); - - Ok(Json(RelayResponse { - nodes, - page_info: PageInfo { - has_next_page, - end_cursor, - }, - })) + Ok(Json(rows)) } diff --git a/warden/src/server/routes/config/schema/update.rs b/warden/src/server/routes/config/schema/update.rs index 13f9fe9..ff518a7 100644 --- a/warden/src/server/routes/config/schema/update.rs +++ b/warden/src/server/routes/config/schema/update.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use api_config::schema::{CreateSchema, SchemaDriver, TransactionSchema}; +use api_config::schema::{CreateSchema, TransactionSchema}; use axum::{ Json, debug_handler, extract::{Path, State}, @@ -12,7 +12,7 @@ use crate::{ server::{ api::version::{Version, VersionPath}, error::AppError, - routes::config::CONFIG, + routes::config::schema::SCHEMA, }, state::AppState, }; @@ -59,7 +59,7 @@ use crate::{ ) ), operation_id = "update_schema", // https://github.com/juhaku/utoipa/issues/1170 - tag = CONFIG, + tag = SCHEMA, request_body( content = CreateSchema ), diff --git a/warden/src/server/routes/transaction_monitoring/monitor.rs b/warden/src/server/routes/transaction_monitoring/monitor.rs index 66075f5..44cc02c 100644 --- a/warden/src/server/routes/transaction_monitoring/monitor.rs +++ b/warden/src/server/routes/transaction_monitoring/monitor.rs @@ -56,8 +56,8 @@ use axum::{ ] pub async fn reload( - State(state): State<Arc<AppState>>, - Path(version): Path<Version>, + State(_state): State<Arc<AppState>>, + Path(_version): Path<Version>, ValidatedTransaction(body): ValidatedTransaction<serde_json::Value>, ) -> StatusCode { dbg!(&body); diff --git a/warden/src/state/mod.rs b/warden/src/state/mod.rs index 7dfcdc4..a6b36e1 100644 --- a/warden/src/state/mod.rs +++ b/warden/src/state/mod.rs @@ -2,11 +2,8 @@ pub(crate) mod database; use std::sync::Arc; use api_config::schema::SchemaDriver; -use tracing::{debug, trace}; use tracing_subscriber::EnvFilter; -use crate::config::Configuration; - pub type LogHandle = tracing_subscriber::reload::Handle<EnvFilter, tracing_subscriber::Registry>; #[derive(Clone)] |
