aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json48
-rw-r--r--.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json48
-rw-r--r--.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json (renamed from .sqlx/query-6474a8297762d8df4bc7d09d837cfa2daa0110377d70386b7788d6cd20052a90.json)8
-rw-r--r--.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json (renamed from .sqlx/query-e9e130d0192838e5321e0bbd42a3ffa93d3dd52e4d8736551d328d6a36b9f482.json)8
-rw-r--r--.sqlx/query-c97114552378d59c671960f1be916c588b74ad726f0a8308ea692d20a8558888.json (renamed from .sqlx/query-8911a00307267a402475c286f765d23f1e3b87538591837bfe41fff03e7313dd.json)4
-rw-r--r--Cargo.lock5
-rw-r--r--lib/api-config/src/schema/create.rs21
-rw-r--r--lib/api-config/src/schema/mod.rs158
-rw-r--r--lib/warden-core/src/config/cli/mod.rs2
-rw-r--r--migrations/20260329120645_transaction_schema.sql8
-rw-r--r--warden/Cargo.toml1
-rw-r--r--warden/src/server/api/mod.rs1
-rw-r--r--warden/src/server/api/pagination.rs21
-rw-r--r--warden/src/server/api/transaction.rs6
-rw-r--r--warden/src/server/api/version.rs7
-rw-r--r--warden/src/server/middleware/extractors/transaction.rs2
-rw-r--r--warden/src/server/routes/config/logs.rs9
-rw-r--r--warden/src/server/routes/config/schema/create.rs13
-rw-r--r--warden/src/server/routes/config/schema/delete.rs21
-rw-r--r--warden/src/server/routes/config/schema/mod.rs1
-rw-r--r--warden/src/server/routes/config/schema/read.rs158
-rw-r--r--warden/src/server/routes/config/schema/update.rs57
-rw-r--r--warden/src/server/routes/transaction_monitoring/monitor.rs15
23 files changed, 462 insertions, 160 deletions
diff --git a/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json b/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json
new file mode 100644
index 0000000..0fa6500
--- /dev/null
+++ b/.sqlx/query-3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2.json
@@ -0,0 +1,48 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "\n update\n transaction_schema\n set \n schema = $3\n where \n schema_type = $1 \n and schema_version = $2\n returning *\n ",
+ "describe": {
+ "columns": [
+ {
+ "ordinal": 0,
+ "name": "schema_type",
+ "type_info": "Text"
+ },
+ {
+ "ordinal": 1,
+ "name": "schema_version",
+ "type_info": "Varchar"
+ },
+ {
+ "ordinal": 2,
+ "name": "schema",
+ "type_info": "Jsonb"
+ },
+ {
+ "ordinal": 3,
+ "name": "created_at",
+ "type_info": "Timestamptz"
+ },
+ {
+ "ordinal": 4,
+ "name": "updated_at",
+ "type_info": "Timestamptz"
+ }
+ ],
+ "parameters": {
+ "Left": [
+ "Text",
+ "Text",
+ "Jsonb"
+ ]
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "3bceb45ae115dd44778ad979e276f0d2a525b5d51d32ac2c9c8ced801e273ad2"
+}
diff --git a/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json b/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json
new file mode 100644
index 0000000..cee9aca
--- /dev/null
+++ b/.sqlx/query-48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f.json
@@ -0,0 +1,48 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "\n select *\n from transaction_schema\n where ($1 = '' or (schema_type, schema_version) > ($1, $2))\n order by schema_type asc, schema_version asc\n limit $3\n ",
+ "describe": {
+ "columns": [
+ {
+ "ordinal": 0,
+ "name": "schema_type",
+ "type_info": "Text"
+ },
+ {
+ "ordinal": 1,
+ "name": "schema_version",
+ "type_info": "Varchar"
+ },
+ {
+ "ordinal": 2,
+ "name": "schema",
+ "type_info": "Jsonb"
+ },
+ {
+ "ordinal": 3,
+ "name": "created_at",
+ "type_info": "Timestamptz"
+ },
+ {
+ "ordinal": 4,
+ "name": "updated_at",
+ "type_info": "Timestamptz"
+ }
+ ],
+ "parameters": {
+ "Left": [
+ "Text",
+ "Text",
+ "Int8"
+ ]
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "48a65761a26b8bfae728cc17754849ec1c1593179da57b1f87bc9a18860ad75f"
+}
diff --git a/.sqlx/query-6474a8297762d8df4bc7d09d837cfa2daa0110377d70386b7788d6cd20052a90.json b/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json
index a46ff8f..0453107 100644
--- a/.sqlx/query-6474a8297762d8df4bc7d09d837cfa2daa0110377d70386b7788d6cd20052a90.json
+++ b/.sqlx/query-502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3.json
@@ -1,16 +1,16 @@
{
"db_name": "PostgreSQL",
- "query": "insert into transaction_schema (type, version, json_schema) values ($1, $2, $3)\n returning\n type as kind, \n version, \n json_schema as schema, \n created_at, \n updated_at\n ",
+ "query": "insert into transaction_schema (schema_type, schema_version, schema) values ($1, $2, $3)\n returning *\n ",
"describe": {
"columns": [
{
"ordinal": 0,
- "name": "kind",
+ "name": "schema_type",
"type_info": "Text"
},
{
"ordinal": 1,
- "name": "version",
+ "name": "schema_version",
"type_info": "Varchar"
},
{
@@ -44,5 +44,5 @@
false
]
},
- "hash": "6474a8297762d8df4bc7d09d837cfa2daa0110377d70386b7788d6cd20052a90"
+ "hash": "502a6d7fb354206edfeabe452d5705efb0ed3ac5bfaa362be8ef615a2ee5f4a3"
}
diff --git a/.sqlx/query-e9e130d0192838e5321e0bbd42a3ffa93d3dd52e4d8736551d328d6a36b9f482.json b/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json
index 7a35bab..5e32b25 100644
--- a/.sqlx/query-e9e130d0192838e5321e0bbd42a3ffa93d3dd52e4d8736551d328d6a36b9f482.json
+++ b/.sqlx/query-5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344.json
@@ -1,16 +1,16 @@
{
"db_name": "PostgreSQL",
- "query": "select \n type as kind, \n version, \n json_schema as schema, \n created_at, \n updated_at\n from transaction_schema where type = $1 and version = $2",
+ "query": "select \n *\n from transaction_schema where schema_type = $1 and schema_version = $2",
"describe": {
"columns": [
{
"ordinal": 0,
- "name": "kind",
+ "name": "schema_type",
"type_info": "Text"
},
{
"ordinal": 1,
- "name": "version",
+ "name": "schema_version",
"type_info": "Varchar"
},
{
@@ -43,5 +43,5 @@
false
]
},
- "hash": "e9e130d0192838e5321e0bbd42a3ffa93d3dd52e4d8736551d328d6a36b9f482"
+ "hash": "5a3f8243d2641834e14a9c6888d54f6c5fbfeec32eb9fd3836251b26ecc97344"
}
diff --git a/.sqlx/query-8911a00307267a402475c286f765d23f1e3b87538591837bfe41fff03e7313dd.json b/.sqlx/query-c97114552378d59c671960f1be916c588b74ad726f0a8308ea692d20a8558888.json
index 99d50b9..401e6e6 100644
--- a/.sqlx/query-8911a00307267a402475c286f765d23f1e3b87538591837bfe41fff03e7313dd.json
+++ b/.sqlx/query-c97114552378d59c671960f1be916c588b74ad726f0a8308ea692d20a8558888.json
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
- "query": "delete from transaction_schema where type = $1 and version = $2",
+ "query": "delete from transaction_schema where schema_type = $1 and schema_version = $2",
"describe": {
"columns": [],
"parameters": {
@@ -11,5 +11,5 @@
},
"nullable": []
},
- "hash": "8911a00307267a402475c286f765d23f1e3b87538591837bfe41fff03e7313dd"
+ "hash": "c97114552378d59c671960f1be916c588b74ad726f0a8308ea692d20a8558888"
}
diff --git a/Cargo.lock b/Cargo.lock
index c2eaea9..442384b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1151,9 +1151,9 @@ checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2"
[[package]]
name = "iri-string"
-version = "0.7.11"
+version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d8e7418f59cc01c88316161279a7f665217ae316b388e58a0d10e29f54f1e5eb"
+checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20"
dependencies = [
"memchr",
"serde",
@@ -3072,6 +3072,7 @@ dependencies = [
"anyhow",
"api-config",
"axum",
+ "base64",
"clap",
"jsonschema",
"secrecy",
diff --git a/lib/api-config/src/schema/create.rs b/lib/api-config/src/schema/create.rs
index eef11f8..e6511d5 100644
--- a/lib/api-config/src/schema/create.rs
+++ b/lib/api-config/src/schema/create.rs
@@ -2,17 +2,16 @@ use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
-/// Transaction to monitor
#[cfg_attr(feature = "utoipa", schema(example = json!({
- "type": "custom.schema",
- "version": "1.0.0",
- "json_schema": {
+ "schemaType": "custom.schema",
+ "schemaVersion": "1.0.0",
+ "schema": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "FinancialTransaction",
"type": "object",
- "required": ["transaction_id", "amount", "currency", "timestamp"],
+ "required": ["transactionId", "amount", "currency", "timestamp"],
"properties": {
- "transaction_id": {
+ "transactionId": {
"type": "string",
"format": "uuid"
},
@@ -32,13 +31,13 @@ use serde::{Deserialize, Serialize};
}
}
})))]
+#[serde(rename_all = "camelCase")]
+/// The json schema to validate for each transaction of this type and version
pub struct CreateSchema {
- #[serde(rename = "type")]
/// Transaction schema type
- pub kind: String,
+ pub schema_type: String,
/// The schema's version
- pub version: String,
- /// The json schema to validate for each transaction of this type and version
- #[serde(rename = "json_schema")]
+ pub schema_version: String,
+ /// The json schema
pub schema: serde_json::Value,
}
diff --git a/lib/api-config/src/schema/mod.rs b/lib/api-config/src/schema/mod.rs
index d1ef5f4..7c65d49 100644
--- a/lib/api-config/src/schema/mod.rs
+++ b/lib/api-config/src/schema/mod.rs
@@ -1,26 +1,28 @@
mod create;
pub use create::CreateSchema;
+use tracing::debug;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use std::fmt::Debug;
use time::OffsetDateTime;
use warden_core::state::AppState;
use crate::ConfigurationError;
/// Transaction to monitor
-#[derive(Deserialize, Serialize)]
+#[derive(Deserialize, Debug, Serialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "utoipa", schema(example = json!({
- "type": "custom.schema",
- "version": "1.0.0",
- "json_schema": {
+ "schemaType": "custom.schema",
+ "schemaVersion": "1.0.0",
+ "schema": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "FinancialTransaction",
"type": "object",
- "required": ["transaction_id", "amount", "currency", "timestamp"],
+ "required": ["transactionId", "amount", "currency", "timestamp"],
"properties": {
- "transaction_id": {
+ "transactionId": {
"type": "string",
"format": "uuid"
},
@@ -39,20 +41,21 @@ use crate::ConfigurationError;
},
}
},
- "created_at": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(),
- "updated_at": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(),
+ "createdAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(),
+ "updatedAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(),
})))]
+#[serde(rename_all = "camelCase")]
pub struct TransactionSchema {
- #[serde(rename = "type")]
/// Transaction schema type
- pub kind: String,
+ pub schema_type: String,
/// The schema's version
- pub version: String,
+ pub schema_version: String,
/// JSON schema for transcation
- #[serde(rename = "json_schema")]
pub schema: serde_json::Value,
+ /// When the schema was created
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
+ /// When the schema was last updated
#[serde(with = "time::serde::rfc3339")]
pub updated_at: OffsetDateTime,
}
@@ -61,49 +64,53 @@ pub struct TransactionSchema {
pub trait SchemaDriver {
async fn create_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
schema: &serde_json::Value,
) -> Result<TransactionSchema, ConfigurationError>;
async fn delete_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
) -> Result<(), ConfigurationError>;
async fn get_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
) -> Result<Option<TransactionSchema>, ConfigurationError>;
async fn update_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
schema: &serde_json::Value,
- ) -> Result<TransactionSchema, ConfigurationError>;
+ ) -> Result<Option<TransactionSchema>, ConfigurationError>;
+
+ async fn get_schemas(
+ &self,
+ limit: i64,
+ first: Option<i64>,
+ after: Option<impl AsRef<str> + Send + Sync + Debug>,
+ ) -> Result<Vec<TransactionSchema>, ConfigurationError>;
}
#[async_trait]
impl SchemaDriver for AppState {
+ #[tracing::instrument(skip(self, schema))]
async fn create_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
schema: &serde_json::Value,
) -> Result<TransactionSchema, crate::ConfigurationError> {
+ debug!("creating transaction schema");
sqlx::query_as!(
TransactionSchema,
- "insert into transaction_schema (type, version, json_schema) values ($1, $2, $3)
- returning
- type as kind,
- version,
- json_schema as schema,
- created_at,
- updated_at
- ",
+ "insert into transaction_schema (schema_type, schema_version, schema) values ($1, $2, $3)
+ returning *
+ ",
kind.as_ref(),
version.as_ref(),
sqlx::types::Json(&schema) as _
@@ -113,13 +120,15 @@ impl SchemaDriver for AppState {
.map_err(|e| e.into())
}
+ #[tracing::instrument(skip(self))]
async fn delete_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
) -> Result<(), crate::ConfigurationError> {
+ debug!("deleting transaction schema");
sqlx::query!(
- "delete from transaction_schema where type = $1 and version = $2",
+ "delete from transaction_schema where schema_type = $1 and schema_version = $2",
kind.as_ref(),
version.as_ref(),
)
@@ -128,20 +137,18 @@ impl SchemaDriver for AppState {
Ok(())
}
+ #[tracing::instrument(skip(self))]
async fn get_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
) -> Result<Option<TransactionSchema>, crate::ConfigurationError> {
+ debug!("getting transaction schema");
let result = sqlx::query_as!(
TransactionSchema,
"select
- type as kind,
- version,
- json_schema as schema,
- created_at,
- updated_at
- from transaction_schema where type = $1 and version = $2",
+ *
+ from transaction_schema where schema_type = $1 and schema_version = $2",
kind.as_ref(),
version.as_ref(),
)
@@ -151,34 +158,71 @@ impl SchemaDriver for AppState {
Ok(result)
}
+ #[tracing::instrument(skip(self, schema))]
async fn update_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
schema: &serde_json::Value,
- ) -> Result<TransactionSchema, crate::ConfigurationError> {
- sqlx::query_as!(TransactionSchema,
- "
+ ) -> Result<Option<TransactionSchema>, crate::ConfigurationError> {
+ debug!("updating transaction schema");
+ sqlx::query_as!(
+ TransactionSchema,
+ "
update
transaction_schema
set
- json_schema = $3
+ schema = $3
where
- type = $1
- and version = $2
- returning
- type as kind,
- version,
- json_schema as schema,
- created_at,
- updated_at
+ schema_type = $1
+ and schema_version = $2
+ returning *
",
kind.as_ref(),
version.as_ref(),
sqlx::types::Json(&schema) as _
)
- .fetch_one(&self.database)
+ .fetch_optional(&self.database)
.await
.map_err(|e| e.into())
}
+
+ #[tracing::instrument(skip(self))]
+ async fn get_schemas(
+ &self,
+ limit: i64,
+ first: Option<i64>,
+ after: Option<impl AsRef<str> + Send + Sync + Debug>,
+ ) -> Result<Vec<TransactionSchema>, ConfigurationError> {
+ debug!("getting transaction schemas");
+ let limit = first.unwrap_or(limit);
+ let mut last_type = String::default();
+ let mut last_version = String::default();
+
+ if let Some(s) = after {
+ let parts: Vec<&str> = s.as_ref().split(',').collect();
+ if parts.len() == 2 {
+ last_type = parts[0].to_string();
+ last_version = parts[1].to_string();
+ }
+ }
+
+ let rows = sqlx::query_as!(
+ TransactionSchema,
+ "
+ select *
+ from transaction_schema
+ where ($1 = '' or (schema_type, schema_version) > ($1, $2))
+ order by schema_type asc, schema_version asc
+ limit $3
+ ",
+ &last_type,
+ &last_version,
+ limit + 1
+ )
+ .fetch_all(&self.database)
+ .await?;
+
+ Ok(rows)
+ }
}
diff --git a/lib/warden-core/src/config/cli/mod.rs b/lib/warden-core/src/config/cli/mod.rs
index 36f6bf0..e0c5450 100644
--- a/lib/warden-core/src/config/cli/mod.rs
+++ b/lib/warden-core/src/config/cli/mod.rs
@@ -65,7 +65,7 @@ impl Default for Server {
port: Some(2210),
environment: Default::default(),
log_level: Some(format!(
- "{}=debug,tower_http=debug,axum::rejection=trace",
+ "{}=debug,tower_http=debug,axum::rejection=trace,sqlx=warn,debug",
env!("CARGO_CRATE_NAME")
)),
log_dir: Some(std::env::temp_dir()),
diff --git a/migrations/20260329120645_transaction_schema.sql b/migrations/20260329120645_transaction_schema.sql
index d7b4744..8496c80 100644
--- a/migrations/20260329120645_transaction_schema.sql
+++ b/migrations/20260329120645_transaction_schema.sql
@@ -2,14 +2,14 @@
-- The transaction's blueprint to be checked on each request
create table transaction_schema (
-- The transaction type
- type text not null,
+ schema_type text not null,
-- The schema's version (to allow for multiple revisions - maybe)
- version varchar not null,
+ schema_version varchar not null,
-- Actual JSON schema
- json_schema jsonb not null,
+ schema jsonb not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
- primary key (type, version)
+ primary key (schema_type, schema_version)
);
create trigger update_transaction_schema_modtime
diff --git a/warden/Cargo.toml b/warden/Cargo.toml
index 1b7c42a..e373fb8 100644
--- a/warden/Cargo.toml
+++ b/warden/Cargo.toml
@@ -9,6 +9,7 @@ homepage.workspace = true
[dependencies]
anyhow = "1.0.102"
axum = { version = "0.8.8", features = ["macros"] }
+base64 = "0.22.1"
clap.workspace = true
jsonschema.workspace = true
secrecy = { version = "0.10.3", features = ["serde"] }
diff --git a/warden/src/server/api/mod.rs b/warden/src/server/api/mod.rs
index 963a3ef..f5aa0c8 100644
--- a/warden/src/server/api/mod.rs
+++ b/warden/src/server/api/mod.rs
@@ -1,3 +1,4 @@
+pub mod pagination;
pub mod transaction;
pub mod version;
diff --git a/warden/src/server/api/pagination.rs b/warden/src/server/api/pagination.rs
new file mode 100644
index 0000000..7341d2f
--- /dev/null
+++ b/warden/src/server/api/pagination.rs
@@ -0,0 +1,21 @@
+use api_config::schema::TransactionSchema;
+use serde::{Deserialize, Serialize};
+use utoipa::{IntoParams, ToSchema};
+
+#[derive(Deserialize, Debug, IntoParams, ToSchema)]
+pub struct PaginationParams {
+ pub first: Option<i64>,
+ pub after: Option<String>, // This is our Base64 cursor
+}
+
+#[derive(Serialize, ToSchema)]
+pub struct PageInfo {
+ pub has_next_page: bool,
+ pub end_cursor: Option<String>,
+}
+
+#[derive(Serialize, ToSchema)]
+pub struct RelayResponse {
+ pub nodes: Vec<TransactionSchema>,
+ pub page_info: PageInfo,
+}
diff --git a/warden/src/server/api/transaction.rs b/warden/src/server/api/transaction.rs
index 9524354..03f8a1f 100644
--- a/warden/src/server/api/transaction.rs
+++ b/warden/src/server/api/transaction.rs
@@ -5,12 +5,10 @@ use utoipa::ToSchema;
/// Transaction to monitor
pub struct Transaction {
- #[serde(rename = "type")]
/// Transaction schema type
- pub kind: String,
+ pub schema_type: String,
/// The schema's version
- pub version: String,
+ pub schema_version: String,
/// Transaction data
- #[serde(rename = "json_schema")]
pub transaction: serde_json::Value,
}
diff --git a/warden/src/server/api/version.rs b/warden/src/server/api/version.rs
index 32cfef3..f8d856a 100644
--- a/warden/src/server/api/version.rs
+++ b/warden/src/server/api/version.rs
@@ -9,9 +9,10 @@ use axum::{
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};
-#[derive(Deserialize,Debug, IntoParams)]
+#[derive(Deserialize, Debug, IntoParams)]
+#[serde(rename_all = "camelCase")]
pub struct VersionPath {
- pub version: Version,
+ pub api_version: Version,
}
#[derive(Debug, ToSchema, Deserialize, Serialize)]
@@ -32,7 +33,7 @@ where
parts.extract().await.map_err(IntoResponse::into_response)?;
let version = params
- .get("version")
+ .get("apiVersion")
.ok_or_else(|| (StatusCode::NOT_FOUND, "version param missing").into_response())?;
match version.as_str() {
diff --git a/warden/src/server/middleware/extractors/transaction.rs b/warden/src/server/middleware/extractors/transaction.rs
index 8d02f6e..cdee029 100644
--- a/warden/src/server/middleware/extractors/transaction.rs
+++ b/warden/src/server/middleware/extractors/transaction.rs
@@ -21,7 +21,7 @@ where
.await
.map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
- let schema_json = get_schema_from_db(&payload.kind)
+ let schema_json = get_schema_from_db(&payload.schema_type)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid schema key".to_string()))?;
let validator = Validator::new(&schema_json).map_err(|_| {
diff --git a/warden/src/server/routes/config/logs.rs b/warden/src/server/routes/config/logs.rs
index 45959c2..67bd14b 100644
--- a/warden/src/server/routes/config/logs.rs
+++ b/warden/src/server/routes/config/logs.rs
@@ -9,14 +9,9 @@ use crate::server::{api::SafeJson, routes::config::CONFIG};
#[derive(Deserialize, Debug, Clone, ToSchema)]
/// Log level
+#[serde(rename_all = "camelCase")]
pub struct LogLevel {
- #[schema(
- examples(
- "info",
- "trace",
- "warden=debug,tower_http=debug,axum::rejection=trace"
- )
- )]
+ #[schema(examples("info", "trace", "warden=debug,tower_http=debug,axum::rejection=trace"))]
log_level: String,
}
diff --git a/warden/src/server/routes/config/schema/create.rs b/warden/src/server/routes/config/schema/create.rs
index b9a3b22..b2d3051 100644
--- a/warden/src/server/routes/config/schema/create.rs
+++ b/warden/src/server/routes/config/schema/create.rs
@@ -10,7 +10,11 @@ use axum::{
use tracing::{info, trace};
use warden_core::state::AppState;
-use crate::server::{api::version::{Version, VersionPath}, error::AppError, routes::config::CONFIG};
+use crate::server::{
+ api::version::{Version, VersionPath},
+ error::AppError,
+ routes::config::CONFIG,
+};
/// Save a transaction's schema
#[utoipa::path(
@@ -59,7 +63,7 @@ use crate::server::{api::version::{Version, VersionPath}, error::AppError, route
content = CreateSchema
),
params(VersionPath),
- path = "/{version}/config/schema"
+ path = "/{apiVersion}/config/schema"
)]
#[tracing::instrument(
name = "create_schema",
@@ -67,7 +71,8 @@ use crate::server::{api::version::{Version, VersionPath}, error::AppError, route
fields(
request_id = %headers.get("x-request-id")
.and_then(|v| v.to_str().ok()).expect("request id"),
- kind = %body.kind,
+ schema_type = %body.schema_type,
+ schema_ver = %body.schema_version,
)
)]
#[debug_handler]
@@ -87,7 +92,7 @@ pub async fn create_schema(
info!("schema is valid. trying to save...");
let result = state
- .create_schema(&body.kind, &body.version, &body.schema)
+ .create_schema(&body.schema_type, &body.schema_version, &body.schema)
.await
.map_err(|e| match e {
api_config::ConfigurationError::Database(ref error) => match error {
diff --git a/warden/src/server/routes/config/schema/delete.rs b/warden/src/server/routes/config/schema/delete.rs
index cfafb68..a761203 100644
--- a/warden/src/server/routes/config/schema/delete.rs
+++ b/warden/src/server/routes/config/schema/delete.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use api_config::schema::{SchemaDriver};
+use api_config::schema::SchemaDriver;
use axum::{
debug_handler,
extract::{Path, Query, State},
@@ -11,18 +11,22 @@ use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};
use warden_core::state::AppState;
-use crate::server::{api::version::{Version, VersionPath}, error::AppError, routes::config::CONFIG};
+use crate::server::{
+ api::version::{Version, VersionPath},
+ error::AppError,
+ routes::config::CONFIG,
+};
/// Schema delete query
#[derive(Deserialize, Serialize, IntoParams, ToSchema)]
+#[serde(rename_all = "camelCase")]
pub struct SchemaDeleteQuery {
/// Schema type
- #[serde(rename = "type")]
#[param(example = "custom.schema")]
- kind: String,
+ schema_type: String,
/// Schema version
#[param(example = "1.0.0")]
- version: String,
+ schema_version: String,
}
/// Delete a transaction's schema
@@ -60,7 +64,7 @@ pub struct SchemaDeleteQuery {
),
operation_id = "delete_schema", // https://github.com/juhaku/utoipa/issues/1170
tag = CONFIG,
- path = "/{version}/config/schema",
+ path = "/{apiVersion}/config/schema",
params(VersionPath, SchemaDeleteQuery),
)]
#[tracing::instrument(
@@ -69,7 +73,8 @@ pub struct SchemaDeleteQuery {
fields(
request_id = %headers.get("x-request-id")
.and_then(|v| v.to_str().ok()).expect("request id"),
- kind = %body.kind,
+ schema_type = %body.schema_type,
+ schema_ver = %body.schema_version,
)
)]
#[debug_handler]
@@ -81,7 +86,7 @@ pub async fn delete_schema(
) -> Result<impl IntoResponse, AppError> {
// TODO: should also clear cached ones eventually
state
- .delete_schema(&body.kind, &body.version)
+ .delete_schema(&body.schema_type, &body.schema_version)
.await?;
Ok(StatusCode::NO_CONTENT)
}
diff --git a/warden/src/server/routes/config/schema/mod.rs b/warden/src/server/routes/config/schema/mod.rs
index 6ab66a1..3fbdb81 100644
--- a/warden/src/server/routes/config/schema/mod.rs
+++ b/warden/src/server/routes/config/schema/mod.rs
@@ -13,6 +13,7 @@ pub fn router(store: Arc<AppState>) -> OpenApiRouter {
.routes(utoipa_axum::routes!(create::create_schema))
.routes(utoipa_axum::routes!(delete::delete_schema))
.routes(utoipa_axum::routes!(read::get_schema))
+ .routes(utoipa_axum::routes!(read::get_schemas))
.routes(utoipa_axum::routes!(update::update_schema))
.with_state(store)
}
diff --git a/warden/src/server/routes/config/schema/read.rs b/warden/src/server/routes/config/schema/read.rs
index 17fc3e2..1e87626 100644
--- a/warden/src/server/routes/config/schema/read.rs
+++ b/warden/src/server/routes/config/schema/read.rs
@@ -7,24 +7,18 @@ use axum::{
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
-use serde::{Deserialize, Serialize};
+use base64::{Engine, engine::general_purpose};
use tracing::debug;
-use utoipa::{IntoParams, ToSchema};
use warden_core::state::AppState;
-use crate::server::{api::version::{Version, VersionPath}, error::AppError, routes::config::CONFIG};
-
-/// Schema search query
-#[derive(Deserialize, Serialize, IntoParams, ToSchema)]
-pub struct SchemaSearchQuery {
- /// Schema type
- #[serde(rename = "type")]
- #[param(example = "custom.schema")]
- kind: String,
- /// Schema version
- #[param(example = "1.0.0")]
- version: String,
-}
+use crate::server::{
+ api::{
+ pagination::{PageInfo, PaginationParams, RelayResponse},
+ version::{Version, VersionPath},
+ },
+ error::AppError,
+ routes::config::CONFIG,
+};
/// Get a transaction's schema
#[utoipa::path(
@@ -34,7 +28,7 @@ pub struct SchemaSearchQuery {
status = 200,
description = "Lookup results",
headers(
- ("x-request-id", description = "Request identifier")
+ ("x-request-id" = Uuid, description = "Request identifier")
),
body = Option<TransactionSchema>
),
@@ -42,56 +36,60 @@ pub struct SchemaSearchQuery {
status = 400,
description = "Invalid request",
headers(
- ("x-request-id", description = "Request identifier")
+ ("x-request-id" = Uuid, description = "Request identifier")
)
),
(
status = 404,
description = "Schema not found",
headers(
- ("x-request-id", description = "Request identifier")
+ ("x-request-id" = Uuid, description = "Request identifier")
)
),
(
status = 405,
description = "Method not allowed",
headers(
- ("x-request-id", description = "Request identifier")
+ ("x-request-id" = Uuid, description = "Request identifier")
)
),
(
status = 500,
description = "Internal server error",
headers(
- ("x-request-id", description = "Request identifier")
+ ("x-request-id" = Uuid, description = "Request identifier")
)
)
),
operation_id = "get_schema", // https://github.com/juhaku/utoipa/issues/1170
tag = CONFIG,
- path = "/{version}/config/schema",
- params(VersionPath, SchemaSearchQuery),
+ path = "/{apiVersion}/config/schema/{schemaType}/{schemaVersion}",
+ params(
+ VersionPath,
+ ("schemaType" = String, Path, example = "custom.schema"),
+ ("schemaVersion" = String, Path, example = "1.0.0"),
+ ),
)]
#[tracing::instrument(
name = "get_schema",
- skip(state, headers, body),
+ skip(state, headers),
fields(
request_id = %headers.get("x-request-id")
.and_then(|v| v.to_str().ok()).expect("request id"),
- kind = %body.kind,
+ schema_type = %schema_type,
+ schema_ver = %schema_version,
)
)]
#[debug_handler]
pub async fn get_schema(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
- Path(version): Path<Version>,
- body: Query<SchemaSearchQuery>,
+ Path((version, schema_type, schema_version)): Path<(Version, String, String)>,
) -> Result<impl IntoResponse, AppError> {
debug!("searching for schema");
// TODO: get from cache
let result = state
- .get_schema(&body.kind, &body.version)
+ .get_schema(&schema_type, &schema_version)
.await
.map_err(|e| match e {
api_config::ConfigurationError::Database(ref error) => match error {
@@ -114,3 +112,109 @@ pub async fn get_schema(
))
}
}
+
+/// Get transaction schemas
+#[utoipa::path(
+ get,
+ responses(
+ (
+ status = 200,
+ description = "Lookup results",
+ headers(
+ ("x-request-id" = Uuid, description = "Request identifier")
+ ),
+ body = RelayResponse
+ ),
+ (
+ status = 400,
+ description = "Invalid request",
+ headers(
+ ("x-request-id" = Uuid, description = "Request identifier")
+ )
+ ),
+ (
+ status = 404,
+ description = "Schema not found",
+ headers(
+ ("x-request-id" = Uuid, description = "Request identifier")
+ )
+ ),
+ (
+ status = 405,
+ description = "Method not allowed",
+ headers(
+ ("x-request-id" = Uuid, description = "Request identifier")
+ )
+ ),
+ (
+ status = 500,
+ description = "Internal server error",
+ headers(
+ ("x-request-id" = Uuid, description = "Request identifier")
+ )
+ )
+ ),
+ operation_id = "get_schemas", // https://github.com/juhaku/utoipa/issues/1170
+ tag = CONFIG,
+ path = "/{apiVersion}/config/schema",
+ params(VersionPath),
+)]
+#[tracing::instrument(
+ name = "get_schemas",
+ skip(state, headers),
+ fields(
+ request_id = %headers.get("x-request-id")
+ .and_then(|v| v.to_str().ok()).expect("request id"),
+ )
+)]
+#[debug_handler]
+pub async fn get_schemas(
+ State(state): State<Arc<AppState>>,
+ headers: HeaderMap,
+ Path(version): Path<Version>,
+ params: Query<PaginationParams>,
+) -> Result<impl IntoResponse, AppError> {
+ debug!("searching for schema");
+ let mut cursor = None;
+ let limit = 10;
+
+ // 1. Decode Cursor
+ if let Some(ref cursor_str) = params.after {
+ let decoded = general_purpose::STANDARD.decode(cursor_str).unwrap();
+ cursor = Some(String::from_utf8(decoded).unwrap());
+ }
+
+ // TODO: get from cache
+ let rows = state
+ .get_schemas(limit, params.first, cursor)
+ .await
+ .map_err(|e| match e {
+ api_config::ConfigurationError::Database(ref error) => match error {
+ sqlx::Error::Database(db_err) if db_err.code() == Some("23505".into()) => {
+ AppError::new(
+ StatusCode::CONFLICT,
+ anyhow::anyhow!("Transaction schema already exists"),
+ )
+ }
+ _ => e.into(),
+ },
+ _ => e.into(),
+ })?;
+
+ // 3. Process Relay Metadata
+ let has_next_page = rows.len() > limit as usize;
+ let nodes: Vec<TransactionSchema> = rows.into_iter().take(limit as usize).collect();
+
+ let end_cursor = nodes.last().map(|node| {
+ let raw_cursor = format!("{},{}", node.schema_type, node.schema_version);
+ general_purpose::STANDARD.encode(raw_cursor)
+ });
+
+ Ok(Json(RelayResponse {
+ nodes,
+ page_info: PageInfo {
+ has_next_page,
+ end_cursor,
+ },
+ }))
+}
diff --git a/warden/src/server/routes/config/schema/update.rs b/warden/src/server/routes/config/schema/update.rs
index a03e8e8..f116e0f 100644
--- a/warden/src/server/routes/config/schema/update.rs
+++ b/warden/src/server/routes/config/schema/update.rs
@@ -1,15 +1,19 @@
use std::sync::Arc;
-use api_config::schema::{CreateSchema, SchemaDriver};
+use api_config::schema::{CreateSchema, SchemaDriver, TransactionSchema};
use axum::{
Json, debug_handler,
- extract::State,
+ extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use warden_core::state::AppState;
-use crate::server::{api::version::Version, error::AppError, routes::config::CONFIG};
+use crate::server::{
+ api::version::{Version, VersionPath},
+ error::AppError,
+ routes::config::CONFIG,
+};
/// Update a transaction's schema
#[utoipa::path(
@@ -17,30 +21,38 @@ use crate::server::{api::version::Version, error::AppError, routes::config::CONF
responses(
(
status = 200,
- description = "The schema has been deleted",
+ description = "The updated schema",
headers(
- ("x-request-id", description = "Request identifier")
- )
+ ("x-request-id" = Uuid, description = "Request identifier")
+ ),
+ body = TransactionSchema,
),
(
status = 400,
description = "Invalid request",
headers(
- ("x-request-id", description = "Request identifier")
+ ("x-request-id" = Uuid, description = "Request identifier")
+ )
+ ),
+ (
+ status = 404,
+ description = "No schema found to update",
+ headers(
+ ("x-request-id" = Uuid, description = "Request identifier")
)
),
(
status = 405,
description = "Method not allowed",
headers(
- ("x-request-id", description = "Request identifier")
+ ("x-request-id" = Uuid, description = "Request identifier")
)
),
(
status = 500,
description = "Internal server error",
headers(
- ("x-request-id", description = "Request identifier")
+ ("x-request-id" = Uuid, description = "Request identifier")
)
)
),
@@ -49,29 +61,40 @@ use crate::server::{api::version::Version, error::AppError, routes::config::CONF
request_body(
content = CreateSchema
),
- path = "/{version}/config/schema",
+ path = "/{apiVersion}/config/schema/{schemaType}/{schemaVersion}",
params(
- ("version" = Version, Path, description = "API version, e.g., v1, v2, v3")
+ VersionPath,
+ ("schemaType" = String, Path, example = "custom.schema"),
+ ("schemaVersion" = String, Path, example = "1.0.0"),
),
)]
#[tracing::instrument(
- name = "delete_schema",
skip(state, headers, body),
fields(
request_id = %headers.get("x-request-id")
.and_then(|v| v.to_str().ok()).expect("request id"),
- kind = %body.kind,
+ schema_type = %schema_type,
+ schema_ver = %schema_version,
)
)]
#[debug_handler]
pub async fn update_schema(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
- Json(body): Json<CreateSchema>,
+ Path((version, schema_type, schema_version)): Path<(Version, String, String)>,
+ Json(body): Json<serde_json::Value>,
) -> Result<impl IntoResponse, AppError> {
// TODO: should also clear cached ones eventually
- state
- .update_schema(&body.kind, &body.version, &body.schema)
+ let result = state
+ .update_schema(schema_type, schema_version, &body)
.await?;
- Ok(StatusCode::OK)
+
+ if let Some(result) = result {
+ Ok(Json(result))
+ } else {
+ Err(AppError::new(
+ StatusCode::NOT_FOUND,
+ anyhow::anyhow!("Resource not found"),
+ ))
+ }
}
diff --git a/warden/src/server/routes/transaction_monitoring/monitor.rs b/warden/src/server/routes/transaction_monitoring/monitor.rs
index 0c60117..817dbca 100644
--- a/warden/src/server/routes/transaction_monitoring/monitor.rs
+++ b/warden/src/server/routes/transaction_monitoring/monitor.rs
@@ -1,10 +1,16 @@
use std::sync::Arc;
use crate::server::{
- api::{transaction::Transaction, version::Version},
+ api::{
+ transaction::Transaction,
+ version::{Version, VersionPath},
+ },
middleware::extractors::transaction::ValidatedTransaction,
};
-use axum::{extract::State, http::StatusCode};
+use axum::{
+ extract::{Path, State},
+ http::StatusCode,
+};
use warden_core::state::AppState;
/// Submit a transaction for monitoring
@@ -38,9 +44,9 @@ use warden_core::state::AppState;
request_body(
content = LogLevel
),
- path = "/{version}/monitor",
+ path = "/{apiVersion}/monitor",
params(
- ("version" = Version, Path, description = "API version, e.g., v1, v2, v3")
+ VersionPath
),
request_body(
content = Transaction
@@ -49,6 +55,7 @@ use warden_core::state::AppState;
]
pub async fn reload(
State(state): State<Arc<AppState>>,
+ Path(version): Path<Version>,
ValidatedTransaction(body): ValidatedTransaction<serde_json::Value>,
) -> StatusCode {
dbg!(&body);