aboutsummaryrefslogtreecommitdiffstats
path: root/lib/api-config/src/schema
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2026-04-01 09:05:33 +0200
committerrtkay123 <dev@kanjala.com>2026-04-01 09:05:33 +0200
commitdaeb5311840680599a0ce6e49d181b9289010f68 (patch)
treed0c9c040ca003a6d431781b867c4290cbe5c9ef2 /lib/api-config/src/schema
parent2c336f0339747aa77a8fe6613b83200c8d4902a5 (diff)
downloadwarden-daeb5311840680599a0ce6e49d181b9289010f68.tar.bz2
warden-daeb5311840680599a0ce6e49d181b9289010f68.zip
feat(schema): cursor pagination
Diffstat (limited to 'lib/api-config/src/schema')
-rw-r--r--lib/api-config/src/schema/create_schema.rs2
-rw-r--r--lib/api-config/src/schema/implementation.rs42
-rw-r--r--lib/api-config/src/schema/list_schemas.rs213
-rw-r--r--lib/api-config/src/schema/mod.rs34
-rw-r--r--lib/api-config/src/schema/pagination.rs9
5 files changed, 257 insertions, 43 deletions
diff --git a/lib/api-config/src/schema/create_schema.rs b/lib/api-config/src/schema/create_schema.rs
index 493fb09..cdbc91f 100644
--- a/lib/api-config/src/schema/create_schema.rs
+++ b/lib/api-config/src/schema/create_schema.rs
@@ -6,7 +6,7 @@ use crate::{
schema::{SchemaService, TransactionSchema},
};
-#[derive(Deserialize, Serialize)]
+#[derive(Deserialize, Serialize, Debug)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "utoipa", schema(example = json!({
"schemaType": "custom.schema",
diff --git a/lib/api-config/src/schema/implementation.rs b/lib/api-config/src/schema/implementation.rs
index c414879..ca0757f 100644
--- a/lib/api-config/src/schema/implementation.rs
+++ b/lib/api-config/src/schema/implementation.rs
@@ -1,7 +1,10 @@
use async_trait::async_trait;
use tracing::debug;
+use warden_core::pagination::{Connection, PaginationArgs};
-use crate::schema::{self, SchemaDriver, SchemaService, TransactionSchema};
+use crate::schema::{
+ self, SchemaDriver, SchemaService, TransactionSchema, pagination::DecodedSchemaPagination,
+};
#[async_trait]
impl SchemaDriver for SchemaService {
@@ -44,41 +47,12 @@ impl SchemaDriver for SchemaService {
}
#[tracing::instrument(skip(self))]
- async fn get_schemas(
+ async fn list_schemas(
&self,
+ input: &PaginationArgs,
limit: i64,
- first: Option<i64>,
- after: Option<&str>,
- ) -> Result<Vec<TransactionSchema>, crate::ConfigurationError> {
+ ) -> Result<Connection<TransactionSchema>, crate::ConfigurationError> {
debug!("getting transaction schemas");
- let limit = first.unwrap_or(limit);
- let mut last_type = String::default();
- let mut last_version = String::default();
-
- if let Some(s) = after {
- let parts: Vec<&str> = s.split(',').collect();
- if parts.len() == 2 {
- last_type = parts[0].to_string();
- last_version = parts[1].to_string();
- }
- }
-
- let rows = sqlx::query_as!(
- TransactionSchema,
- "
- select *
- from transaction_schema
- where ($1 = '' or (schema_type, schema_version) > ($1, $2))
- order by schema_type asc, schema_version asc
- limit $3
- ",
- &last_type,
- &last_version,
- limit + 1
- )
- .fetch_all(&self.database)
- .await?;
-
- Ok(rows)
+ schema::list_schemas::list_schemas(self, input, limit).await
}
}
diff --git a/lib/api-config/src/schema/list_schemas.rs b/lib/api-config/src/schema/list_schemas.rs
new file mode 100644
index 0000000..0b539ed
--- /dev/null
+++ b/lib/api-config/src/schema/list_schemas.rs
@@ -0,0 +1,213 @@
+use base64::{Engine, engine::general_purpose};
+use tracing::debug;
+use warden_core::pagination::{Connection, Edge, PageInfo, PaginationArgs};
+
+use crate::{
+ ConfigurationError,
+ schema::{SchemaService, TransactionSchema, pagination::DecodedSchemaPagination},
+};
+
+fn decode_cursors(
+ args: &PaginationArgs,
+ limit: i64,
+) -> Result<DecodedSchemaPagination, ConfigurationError> {
+ let limit = args
+ .first
+ .or(args.last)
+ .map(|value| if value > limit { limit } else { value })
+ .unwrap_or(limit);
+
+ // Fetch limit + 1 to check if there's a next/prev page
+ let query_limit = limit + 1;
+
+ let (id, type_cursor, ver_cursor) =
+ if let Some(cursor) = args.after.as_ref().or(args.before.as_ref()) {
+ let decoded = general_purpose::STANDARD.decode(cursor)?;
+ let s = String::from_utf8(decoded)?;
+ let parts: Vec<&str> = s.split(',').collect();
+ (
+ Some(parts[0].parse()?),
+ Some(parts[1].to_string()),
+ Some(parts[2].to_string()),
+ )
+ } else {
+ (None, None, None)
+ };
+
+ Ok(DecodedSchemaPagination {
+ type_cursor,
+ version_cursor: ver_cursor,
+ limit: query_limit,
+ is_last: args.last.is_some(),
+ base_limit: limit,
+ id,
+ })
+}
+
+pub(super) async fn list_schemas(
+ state: &SchemaService,
+ args: &PaginationArgs,
+ limit: i64,
+) -> Result<Connection<TransactionSchema>, ConfigurationError> {
+ debug!("listing schemas transaction schema");
+
+ let input = decode_cursors(args, limit)?;
+
+ let rows = if input.is_last {
+ // BACKWARD PAGINATION
+ sqlx::query_as!(
+ TransactionSchema,
+ "
+ select * from (
+ select * from transaction_schema
+ where ($1::bigint is null or (id, schema_type, schema_version) < ($1, $2, $3))
+ order by id desc, schema_type desc, schema_version desc
+ limit $4
+ ) sub
+ order by id asc, schema_type asc, schema_version asc
+ ",
+ input.id,
+ input.type_cursor,
+ input.version_cursor,
+ input.limit
+ )
+ .fetch_all(&state.database)
+ .await?
+ } else {
+ // FORWARD PAGINATION (Default)
+ sqlx::query_as!(
+ TransactionSchema,
+ "
+ select * from transaction_schema
+ where ($1::bigint is null or (id, schema_type, schema_version) > ($1, $2, $3))
+ order by id asc, schema_type asc, schema_version asc
+ limit $4
+ ",
+ input.id,
+ input.type_cursor,
+ input.version_cursor,
+ input.limit
+ )
+ .fetch_all(&state.database)
+ .await?
+ };
+
+ let res = encode_cursors(&rows, &input)?;
+
+ Ok(res)
+}
+
+fn encode_cursors(
+ rows: &[TransactionSchema],
+ args: &DecodedSchemaPagination,
+) -> Result<Connection<TransactionSchema>, ConfigurationError> {
+ let has_extra = rows.len() > args.base_limit as usize;
+ let final_rows = if has_extra {
+ if args.is_last {
+ &rows[1..]
+ } else {
+ &rows[..args.base_limit as usize]
+ }
+ } else {
+ rows
+ };
+
+ let edges = final_rows
+ .iter()
+ .map(|row| {
+ let raw_cursor = format!("{},{},{}", row.id, row.schema_type, row.schema_version);
+ Edge {
+ node: row.clone(),
+ cursor: general_purpose::STANDARD.encode(raw_cursor),
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let page_info = PageInfo {
+ has_next_page: if args.is_last { false } else { has_extra },
+ has_previous_page: if args.is_last { has_extra } else { false },
+ start_cursor: edges.first().map(|e| e.cursor.clone()),
+ end_cursor: edges.last().map(|e| e.cursor.clone()),
+ };
+
+ Ok(Connection { edges, page_info })
+}
+
+#[cfg(test)]
+mod tests {
+ use base64::{Engine, engine::general_purpose};
+ use sqlx::PgPool;
+ use warden_core::pagination::PaginationArgs;
+
+ use crate::schema::{SchemaDriver, SchemaService};
+
+ #[sqlx::test(
+ migrator = "crate::MIGRATOR",
+ fixtures(path = "../../tests/fixtures", scripts("schema"))
+ )]
+ async fn test_forward_pagination(pool: PgPool) -> anyhow::Result<()> {
+ let get_count = 2;
+
+ // 1. Request first 2
+ let args = PaginationArgs {
+ first: Some(get_count),
+ after: None,
+ last: None,
+ before: None,
+ };
+ let limit = 10;
+ let driver = SchemaService { database: pool };
+
+ let res = driver.list_schemas(&args, limit).await?;
+
+ assert_eq!(res.edges.len(), get_count as usize);
+ dbg!(&res);
+ assert_eq!(res.edges[0].node.schema_type, "payment");
+ assert_eq!(res.edges[0].node.schema_version, "1.0.0");
+ assert!(res.page_info.has_next_page);
+
+ // 2. Request next 2 using the end_cursor
+ let args_next = PaginationArgs {
+ first: Some(2),
+ after: res.page_info.end_cursor,
+ last: None,
+ before: None,
+ };
+
+ let res_next = driver.list_schemas(&args_next, limit).await?;
+
+ assert_eq!(res_next.edges.len(), 2);
+ assert_eq!(res_next.edges[0].node.schema_type, "payment");
+ assert_eq!(res_next.edges[1].node.schema_type, "refund");
+ Ok(())
+ }
+
+ #[sqlx::test(
+ migrator = "crate::MIGRATOR",
+ fixtures(path = "../../tests/fixtures", scripts("schema"))
+ )]
+ async fn test_backward_pagination(pool: PgPool) -> anyhow::Result<()> {
+ let get_count = 2;
+
+ let cursor = general_purpose::STANDARD.encode("4,refund,1.2.0");
+
+ let args = PaginationArgs {
+ first: None,
+ after: None,
+ last: Some(get_count),
+ before: Some(cursor),
+ };
+
+ let limit = 10;
+ let driver = SchemaService { database: pool };
+
+ let res = driver.list_schemas(&args, limit).await?;
+
+ assert_eq!(res.edges.len(), get_count as usize);
+ assert_eq!(res.edges[0].node.schema_type, "refund");
+ assert_eq!(res.edges[0].node.schema_version, "1.0.0");
+ assert_eq!(res.edges[1].node.schema_type, "payment");
+ assert_eq!(res.edges[1].node.schema_version, "1.1.0");
+ Ok(())
+ }
+}
diff --git a/lib/api-config/src/schema/mod.rs b/lib/api-config/src/schema/mod.rs
index d16ee9f..54bc015 100644
--- a/lib/api-config/src/schema/mod.rs
+++ b/lib/api-config/src/schema/mod.rs
@@ -2,6 +2,8 @@ mod create_schema;
mod delete_schema;
mod get_schema;
mod implementation;
+mod list_schemas;
+mod pagination;
mod update_schema;
pub use create_schema::CreateSchema;
use sqlx::PgPool;
@@ -10,11 +12,13 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use time::OffsetDateTime;
+use warden_core::pagination::Connection;
+use warden_core::pagination::PaginationArgs;
use crate::ConfigurationError;
-/// Transaction to monitor
-#[derive(Deserialize, Debug, Serialize)]
+/// Transaction's schema
+#[derive(Clone, Deserialize, Debug, Serialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "utoipa", schema(example = json!({
"schemaType": "custom.schema",
@@ -49,6 +53,9 @@ use crate::ConfigurationError;
})))]
#[serde(rename_all = "camelCase")]
pub struct TransactionSchema {
+ /// Transaction schema id
+ #[serde(skip)]
+ pub id: i64,
/// Transaction schema type
pub schema_type: String,
/// The schema's version
@@ -63,8 +70,16 @@ pub struct TransactionSchema {
pub updated_at: OffsetDateTime,
}
+/// The type implementing [SchemaDriver]
+#[derive(Debug)]
pub struct SchemaService {
- pub database: PgPool,
+ pub(crate) database: PgPool,
+}
+
+impl SchemaService {
+ pub fn new(database: PgPool) -> Self {
+ Self { database }
+ }
}
#[async_trait]
@@ -91,12 +106,11 @@ pub trait SchemaDriver: Send + Sync {
schema: &serde_json::Value,
) -> Result<Option<TransactionSchema>, ConfigurationError>;
- async fn get_schemas(
+ async fn list_schemas(
&self,
+ input: &PaginationArgs,
limit: i64,
- first: Option<i64>,
- after: Option<&str>,
- ) -> Result<Vec<TransactionSchema>, ConfigurationError>;
+ ) -> Result<Connection<TransactionSchema>, ConfigurationError>;
}
#[cfg(test)]
@@ -110,7 +124,7 @@ mod tests {
fixtures(path = "../../tests/fixtures", scripts("schema"))
)]
async fn schema(pool: PgPool) -> anyhow::Result<()> {
- let driver = SchemaService { database: pool };
+ let driver = SchemaService::new(pool);
// 2. Define Fixtures
let kind = "fin_tx_v1";
@@ -131,6 +145,10 @@ mod tests {
.expect("Create failed");
assert_eq!(created.schema_type, kind);
+ // CREATE AGAIN SHOULD FAIL (CONFLICTING IDs)
+ let created = driver.create_schema(kind, version, &min_schema).await;
+ assert!(created.is_err());
+
// GET
let found = driver
.get_schema("payment", "1.0.0")
diff --git a/lib/api-config/src/schema/pagination.rs b/lib/api-config/src/schema/pagination.rs
new file mode 100644
index 0000000..caaf726
--- /dev/null
+++ b/lib/api-config/src/schema/pagination.rs
@@ -0,0 +1,9 @@
+#[derive(Debug)]
+pub struct DecodedSchemaPagination {
+ pub(crate) type_cursor: Option<String>,
+ pub(crate) version_cursor: Option<String>,
+ pub(crate) limit: i64,
+ pub(crate) base_limit: i64,
+ pub(crate) is_last: bool,
+ pub(crate) id: Option<i64>,
+}