mod create; pub use create::CreateSchema; use sqlx::PgPool; use tracing::debug; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use time::OffsetDateTime; use crate::ConfigurationError; /// Transaction to monitor #[derive(Deserialize, Debug, Serialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[cfg_attr(feature = "utoipa", schema(example = json!({ "schemaType": "custom.schema", "schemaVersion": "1.0.0", "schema": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "FinancialTransaction", "type": "object", "required": ["transactionId", "amount", "currency", "timestamp"], "properties": { "transactionId": { "type": "string", "format": "uuid" }, "amount": { "type": "number", "exclusiveMinimum": 0 }, "currency": { "type": "string", "pattern": "^[A-Z]{3}$", "description": "ISO 4217 Alpha-3 code (e.g., USD, EUR)" }, "timestamp": { "type": "string", "format": "date-time" }, } }, "createdAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), "updatedAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(), })))] #[serde(rename_all = "camelCase")] pub struct TransactionSchema { /// Transaction schema type pub schema_type: String, /// The schema's version pub schema_version: String, /// JSON schema for transcation pub schema: serde_json::Value, /// When the schema was created #[serde(with = "time::serde::rfc3339")] pub created_at: OffsetDateTime, /// When the schema was last updated #[serde(with = "time::serde::rfc3339")] pub updated_at: OffsetDateTime, } pub struct SchemaService { pub database: PgPool, } #[async_trait] pub trait SchemaDriver: Send + Sync { async fn create_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result; async fn delete_schema(&self, kind: &str, version: &str) -> Result<(), ConfigurationError>; async fn get_schema( &self, kind: &str, version: &str, ) -> Result, ConfigurationError>; async fn update_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result, ConfigurationError>; async fn get_schemas( &self, limit: i64, first: Option, after: Option<&str>, ) -> Result, ConfigurationError>; } #[async_trait] impl SchemaDriver for SchemaService { #[tracing::instrument(skip(self, schema))] async fn create_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result { debug!("creating transaction schema"); sqlx::query_as!( TransactionSchema, "insert into transaction_schema (schema_type, schema_version, schema) values ($1, $2, $3) returning * ", kind, version, sqlx::types::Json(&schema) as _ ) .fetch_one(&self.database) .await .map_err(|e| e.into()) } #[tracing::instrument(skip(self))] async fn delete_schema( &self, kind: &str, version: &str, ) -> Result<(), crate::ConfigurationError> { debug!("deleting transaction schema"); sqlx::query!( "delete from transaction_schema where schema_type = $1 and schema_version = $2", kind, version, ) .execute(&self.database) .await?; Ok(()) } #[tracing::instrument(skip(self))] async fn get_schema( &self, kind: &str, version: &str, ) -> Result, crate::ConfigurationError> { debug!("getting transaction schema"); let result = sqlx::query_as!( TransactionSchema, "select * from transaction_schema where schema_type = $1 and schema_version = $2", kind, version, ) .fetch_optional(&self.database) .await?; Ok(result) } #[tracing::instrument(skip(self, schema))] async fn update_schema( &self, kind: &str, version: &str, schema: &serde_json::Value, ) -> Result, crate::ConfigurationError> { debug!("updating transaction schema"); sqlx::query_as!( TransactionSchema, " update transaction_schema set schema = $3 where schema_type = $1 and schema_version = $2 returning * ", kind, version, sqlx::types::Json(&schema) as _ ) .fetch_optional(&self.database) .await .map_err(|e| e.into()) } #[tracing::instrument(skip(self))] async fn get_schemas( &self, limit: i64, first: Option, after: Option<&str>, ) -> Result, ConfigurationError> { debug!("getting transaction schemas"); let limit = first.unwrap_or(limit); let mut last_type = String::default(); let mut last_version = String::default(); if let Some(s) = after { let parts: Vec<&str> = s.split(',').collect(); if parts.len() == 2 { last_type = parts[0].to_string(); last_version = parts[1].to_string(); } } let rows = sqlx::query_as!( TransactionSchema, " select * from transaction_schema where ($1 = '' or (schema_type, schema_version) > ($1, $2)) order by schema_type asc, schema_version asc limit $3 ", &last_type, &last_version, limit + 1 ) .fetch_all(&self.database) .await?; Ok(rows) } }