From daeb5311840680599a0ce6e49d181b9289010f68 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Wed, 1 Apr 2026 09:05:33 +0200 Subject: feat(schema): cursor pagination --- lib/api-config/src/error.rs | 6 + lib/api-config/src/lib.rs | 8 +- lib/api-config/src/schema/create_schema.rs | 2 +- lib/api-config/src/schema/implementation.rs | 42 ++---- lib/api-config/src/schema/list_schemas.rs | 213 ++++++++++++++++++++++++++++ lib/api-config/src/schema/mod.rs | 34 +++-- lib/api-config/src/schema/pagination.rs | 9 ++ 7 files changed, 270 insertions(+), 44 deletions(-) create mode 100644 lib/api-config/src/schema/list_schemas.rs create mode 100644 lib/api-config/src/schema/pagination.rs (limited to 'lib/api-config/src') diff --git a/lib/api-config/src/error.rs b/lib/api-config/src/error.rs index b2dc9ad..a6c4991 100644 --- a/lib/api-config/src/error.rs +++ b/lib/api-config/src/error.rs @@ -10,4 +10,10 @@ pub enum ConfigurationError { InvalidHeader { expected: String, found: String }, #[error("unknown data store error")] Unknown, + #[error(transparent)] + Pagination(#[from] base64::DecodeError), + #[error(transparent)] + PaginationCursor(#[from] std::string::FromUtf8Error), + #[error(transparent)] + PaginationId(#[from] std::num::ParseIntError), } diff --git a/lib/api-config/src/lib.rs b/lib/api-config/src/lib.rs index 0d0117a..c2c6ccb 100644 --- a/lib/api-config/src/lib.rs +++ b/lib/api-config/src/lib.rs @@ -1,5 +1,11 @@ +//! Configuration +#![warn(missing_docs, missing_debug_implementations)] mod error; +/// Schema configuration implementation pub mod schema; +/// Errors returned by the configuration library pub use error::ConfigurationError; -pub(crate) static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("../../migrations"); +#[cfg(test)] +/// Database migrator +pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("../../migrations"); diff --git a/lib/api-config/src/schema/create_schema.rs b/lib/api-config/src/schema/create_schema.rs index 493fb09..cdbc91f 100644 --- a/lib/api-config/src/schema/create_schema.rs +++ b/lib/api-config/src/schema/create_schema.rs @@ -6,7 +6,7 @@ use crate::{ schema::{SchemaService, TransactionSchema}, }; -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Debug)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[cfg_attr(feature = "utoipa", schema(example = json!({ "schemaType": "custom.schema", diff --git a/lib/api-config/src/schema/implementation.rs b/lib/api-config/src/schema/implementation.rs index c414879..ca0757f 100644 --- a/lib/api-config/src/schema/implementation.rs +++ b/lib/api-config/src/schema/implementation.rs @@ -1,7 +1,10 @@ use async_trait::async_trait; use tracing::debug; +use warden_core::pagination::{Connection, PaginationArgs}; -use crate::schema::{self, SchemaDriver, SchemaService, TransactionSchema}; +use crate::schema::{ + self, SchemaDriver, SchemaService, TransactionSchema, pagination::DecodedSchemaPagination, +}; #[async_trait] impl SchemaDriver for SchemaService { @@ -44,41 +47,12 @@ impl SchemaDriver for SchemaService { } #[tracing::instrument(skip(self))] - async fn get_schemas( + async fn list_schemas( &self, + input: &PaginationArgs, limit: i64, - first: Option, - after: Option<&str>, - ) -> Result, crate::ConfigurationError> { + ) -> Result, crate::ConfigurationError> { debug!("getting transaction schemas"); - let limit = first.unwrap_or(limit); - let mut last_type = String::default(); - let mut last_version = String::default(); - - if let Some(s) = after { - let parts: Vec<&str> = s.split(',').collect(); - if parts.len() == 2 { - last_type = parts[0].to_string(); - last_version = parts[1].to_string(); - } - } - - let rows = sqlx::query_as!( - TransactionSchema, - " - select * - from transaction_schema - where ($1 = '' or (schema_type, schema_version) > ($1, $2)) - order by schema_type asc, schema_version asc - limit $3 - ", - &last_type, - &last_version, - limit + 1 - ) - .fetch_all(&self.database) - .await?; - - Ok(rows) + schema::list_schemas::list_schemas(self, input, limit).await } } diff --git a/lib/api-config/src/schema/list_schemas.rs b/lib/api-config/src/schema/list_schemas.rs new file mode 100644 index 0000000..0b539ed --- /dev/null +++ b/lib/api-config/src/schema/list_schemas.rs @@ -0,0 +1,213 @@ +use base64::{Engine, engine::general_purpose}; +use tracing::debug; +use warden_core::pagination::{Connection, Edge, PageInfo, PaginationArgs}; + +use crate::{ + ConfigurationError, + schema::{SchemaService, TransactionSchema, pagination::DecodedSchemaPagination}, +}; + +fn decode_cursors( + args: &PaginationArgs, + limit: i64, +) -> Result { + let limit = args + .first + .or(args.last) + .map(|value| if value > limit { limit } else { value }) + .unwrap_or(limit); + + // Fetch limit + 1 to check if there's a next/prev page + let query_limit = limit + 1; + + let (id, type_cursor, ver_cursor) = + if let Some(cursor) = args.after.as_ref().or(args.before.as_ref()) { + let decoded = general_purpose::STANDARD.decode(cursor)?; + let s = String::from_utf8(decoded)?; + let parts: Vec<&str> = s.split(',').collect(); + ( + Some(parts[0].parse()?), + Some(parts[1].to_string()), + Some(parts[2].to_string()), + ) + } else { + (None, None, None) + }; + + Ok(DecodedSchemaPagination { + type_cursor, + version_cursor: ver_cursor, + limit: query_limit, + is_last: args.last.is_some(), + base_limit: limit, + id, + }) +} + +pub(super) async fn list_schemas( + state: &SchemaService, + args: &PaginationArgs, + limit: i64, +) -> Result, ConfigurationError> { + debug!("listing schemas transaction schema"); + + let input = decode_cursors(args, limit)?; + + let rows = if input.is_last { + // BACKWARD PAGINATION + sqlx::query_as!( + TransactionSchema, + " + select * from ( + select * from transaction_schema + where ($1::bigint is null or (id, schema_type, schema_version) < ($1, $2, $3)) + order by id desc, schema_type desc, schema_version desc + limit $4 + ) sub + order by id asc, schema_type asc, schema_version asc + ", + input.id, + input.type_cursor, + input.version_cursor, + input.limit + ) + .fetch_all(&state.database) + .await? + } else { + // FORWARD PAGINATION (Default) + sqlx::query_as!( + TransactionSchema, + " + select * from transaction_schema + where ($1::bigint is null or (id, schema_type, schema_version) > ($1, $2, $3)) + order by id asc, schema_type asc, schema_version asc + limit $4 + ", + input.id, + input.type_cursor, + input.version_cursor, + input.limit + ) + .fetch_all(&state.database) + .await? + }; + + let res = encode_cursors(&rows, &input)?; + + Ok(res) +} + +fn encode_cursors( + rows: &[TransactionSchema], + args: &DecodedSchemaPagination, +) -> Result, ConfigurationError> { + let has_extra = rows.len() > args.base_limit as usize; + let final_rows = if has_extra { + if args.is_last { + &rows[1..] + } else { + &rows[..args.base_limit as usize] + } + } else { + rows + }; + + let edges = final_rows + .iter() + .map(|row| { + let raw_cursor = format!("{},{},{}", row.id, row.schema_type, row.schema_version); + Edge { + node: row.clone(), + cursor: general_purpose::STANDARD.encode(raw_cursor), + } + }) + .collect::>(); + + let page_info = PageInfo { + has_next_page: if args.is_last { false } else { has_extra }, + has_previous_page: if args.is_last { has_extra } else { false }, + start_cursor: edges.first().map(|e| e.cursor.clone()), + end_cursor: edges.last().map(|e| e.cursor.clone()), + }; + + Ok(Connection { edges, page_info }) +} + +#[cfg(test)] +mod tests { + use base64::{Engine, engine::general_purpose}; + use sqlx::PgPool; + use warden_core::pagination::PaginationArgs; + + use crate::schema::{SchemaDriver, SchemaService}; + + #[sqlx::test( + migrator = "crate::MIGRATOR", + fixtures(path = "../../tests/fixtures", scripts("schema")) + )] + async fn test_forward_pagination(pool: PgPool) -> anyhow::Result<()> { + let get_count = 2; + + // 1. Request first 2 + let args = PaginationArgs { + first: Some(get_count), + after: None, + last: None, + before: None, + }; + let limit = 10; + let driver = SchemaService { database: pool }; + + let res = driver.list_schemas(&args, limit).await?; + + assert_eq!(res.edges.len(), get_count as usize); + dbg!(&res); + assert_eq!(res.edges[0].node.schema_type, "payment"); + assert_eq!(res.edges[0].node.schema_version, "1.0.0"); + assert!(res.page_info.has_next_page); + + // 2. Request next 2 using the end_cursor + let args_next = PaginationArgs { + first: Some(2), + after: res.page_info.end_cursor, + last: None, + before: None, + }; + + let res_next = driver.list_schemas(&args_next, limit).await?; + + assert_eq!(res_next.edges.len(), 2); + assert_eq!(res_next.edges[0].node.schema_type, "payment"); + assert_eq!(res_next.edges[1].node.schema_type, "refund"); + Ok(()) + } + + #[sqlx::test( + migrator = "crate::MIGRATOR", + fixtures(path = "../../tests/fixtures", scripts("schema")) + )] + async fn test_backward_pagination(pool: PgPool) -> anyhow::Result<()> { + let get_count = 2; + + let cursor = general_purpose::STANDARD.encode("4,refund,1.2.0"); + + let args = PaginationArgs { + first: None, + after: None, + last: Some(get_count), + before: Some(cursor), + }; + + let limit = 10; + let driver = SchemaService { database: pool }; + + let res = driver.list_schemas(&args, limit).await?; + + assert_eq!(res.edges.len(), get_count as usize); + assert_eq!(res.edges[0].node.schema_type, "refund"); + assert_eq!(res.edges[0].node.schema_version, "1.0.0"); + assert_eq!(res.edges[1].node.schema_type, "payment"); + assert_eq!(res.edges[1].node.schema_version, "1.1.0"); + Ok(()) + } +} diff --git a/lib/api-config/src/schema/mod.rs b/lib/api-config/src/schema/mod.rs index d16ee9f..54bc015 100644 --- a/lib/api-config/src/schema/mod.rs +++ b/lib/api-config/src/schema/mod.rs @@ -2,6 +2,8 @@ mod create_schema; mod delete_schema; mod get_schema; mod implementation; +mod list_schemas; +mod pagination; mod update_schema; pub use create_schema::CreateSchema; use sqlx::PgPool; @@ -10,11 +12,13 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use time::OffsetDateTime; +use warden_core::pagination::Connection; +use warden_core::pagination::PaginationArgs; use crate::ConfigurationError; -/// Transaction to monitor -#[derive(Deserialize, Debug, Serialize)] +/// Transaction's schema +#[derive(Clone, Deserialize, Debug, Serialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[cfg_attr(feature = "utoipa", schema(example = json!({ "schemaType": "custom.schema", @@ -49,6 +53,9 @@ use crate::ConfigurationError; })))] #[serde(rename_all = "camelCase")] pub struct TransactionSchema { + /// Transaction schema id + #[serde(skip)] + pub id: i64, /// Transaction schema type pub schema_type: String, /// The schema's version @@ -63,8 +70,16 @@ pub struct TransactionSchema { pub updated_at: OffsetDateTime, } +/// The type implementing [SchemaDriver] +#[derive(Debug)] pub struct SchemaService { - pub database: PgPool, + pub(crate) database: PgPool, +} + +impl SchemaService { + pub fn new(database: PgPool) -> Self { + Self { database } + } } #[async_trait] @@ -91,12 +106,11 @@ pub trait SchemaDriver: Send + Sync { schema: &serde_json::Value, ) -> Result, ConfigurationError>; - async fn get_schemas( + async fn list_schemas( &self, + input: &PaginationArgs, limit: i64, - first: Option, - after: Option<&str>, - ) -> Result, ConfigurationError>; + ) -> Result, ConfigurationError>; } #[cfg(test)] @@ -110,7 +124,7 @@ mod tests { fixtures(path = "../../tests/fixtures", scripts("schema")) )] async fn schema(pool: PgPool) -> anyhow::Result<()> { - let driver = SchemaService { database: pool }; + let driver = SchemaService::new(pool); // 2. Define Fixtures let kind = "fin_tx_v1"; @@ -131,6 +145,10 @@ mod tests { .expect("Create failed"); assert_eq!(created.schema_type, kind); + // CREATE AGAIN SHOULD FAIL (CONFLICTING IDs) + let created = driver.create_schema(kind, version, &min_schema).await; + assert!(created.is_err()); + // GET let found = driver .get_schema("payment", "1.0.0") diff --git a/lib/api-config/src/schema/pagination.rs b/lib/api-config/src/schema/pagination.rs new file mode 100644 index 0000000..caaf726 --- /dev/null +++ b/lib/api-config/src/schema/pagination.rs @@ -0,0 +1,9 @@ +#[derive(Debug)] +pub struct DecodedSchemaPagination { + pub(crate) type_cursor: Option, + pub(crate) version_cursor: Option, + pub(crate) limit: i64, + pub(crate) base_limit: i64, + pub(crate) is_last: bool, + pub(crate) id: Option, +} -- cgit v1.2.3