aboutsummaryrefslogtreecommitdiffstats
path: root/lib/api-config/src/schema/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/api-config/src/schema/mod.rs')
-rw-r--r--lib/api-config/src/schema/mod.rs158
1 files changed, 101 insertions, 57 deletions
diff --git a/lib/api-config/src/schema/mod.rs b/lib/api-config/src/schema/mod.rs
index d1ef5f4..7c65d49 100644
--- a/lib/api-config/src/schema/mod.rs
+++ b/lib/api-config/src/schema/mod.rs
@@ -1,26 +1,28 @@
mod create;
pub use create::CreateSchema;
+use tracing::debug;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use std::fmt::Debug;
use time::OffsetDateTime;
use warden_core::state::AppState;
use crate::ConfigurationError;
/// Transaction to monitor
-#[derive(Deserialize, Serialize)]
+#[derive(Deserialize, Debug, Serialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "utoipa", schema(example = json!({
- "type": "custom.schema",
- "version": "1.0.0",
- "json_schema": {
+ "schemaType": "custom.schema",
+ "schemaVersion": "1.0.0",
+ "schema": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "FinancialTransaction",
"type": "object",
- "required": ["transaction_id", "amount", "currency", "timestamp"],
+ "required": ["transactionId", "amount", "currency", "timestamp"],
"properties": {
- "transaction_id": {
+ "transactionId": {
"type": "string",
"format": "uuid"
},
@@ -39,20 +41,21 @@ use crate::ConfigurationError;
},
}
},
- "created_at": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(),
- "updated_at": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(),
+ "createdAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(),
+ "updatedAt": time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339).unwrap(),
})))]
+#[serde(rename_all = "camelCase")]
pub struct TransactionSchema {
- #[serde(rename = "type")]
/// Transaction schema type
- pub kind: String,
+ pub schema_type: String,
/// The schema's version
- pub version: String,
+ pub schema_version: String,
/// JSON schema for transcation
- #[serde(rename = "json_schema")]
pub schema: serde_json::Value,
+ /// When the schema was created
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
+ /// When the schema was last updated
#[serde(with = "time::serde::rfc3339")]
pub updated_at: OffsetDateTime,
}
@@ -61,49 +64,53 @@ pub struct TransactionSchema {
pub trait SchemaDriver {
async fn create_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
schema: &serde_json::Value,
) -> Result<TransactionSchema, ConfigurationError>;
async fn delete_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
) -> Result<(), ConfigurationError>;
async fn get_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
) -> Result<Option<TransactionSchema>, ConfigurationError>;
async fn update_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
schema: &serde_json::Value,
- ) -> Result<TransactionSchema, ConfigurationError>;
+ ) -> Result<Option<TransactionSchema>, ConfigurationError>;
+
+ async fn get_schemas(
+ &self,
+ limit: i64,
+ first: Option<i64>,
+ after: Option<impl AsRef<str> + Send + Sync + Debug>,
+ ) -> Result<Vec<TransactionSchema>, ConfigurationError>;
}
#[async_trait]
impl SchemaDriver for AppState {
+ #[tracing::instrument(skip(self, schema))]
async fn create_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
schema: &serde_json::Value,
) -> Result<TransactionSchema, crate::ConfigurationError> {
+ debug!("creating transaction schema");
sqlx::query_as!(
TransactionSchema,
- "insert into transaction_schema (type, version, json_schema) values ($1, $2, $3)
- returning
- type as kind,
- version,
- json_schema as schema,
- created_at,
- updated_at
- ",
+ "insert into transaction_schema (schema_type, schema_version, schema) values ($1, $2, $3)
+ returning *
+ ",
kind.as_ref(),
version.as_ref(),
sqlx::types::Json(&schema) as _
@@ -113,13 +120,15 @@ impl SchemaDriver for AppState {
.map_err(|e| e.into())
}
+ #[tracing::instrument(skip(self))]
async fn delete_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
) -> Result<(), crate::ConfigurationError> {
+ debug!("deleting transaction schema");
sqlx::query!(
- "delete from transaction_schema where type = $1 and version = $2",
+ "delete from transaction_schema where schema_type = $1 and schema_version = $2",
kind.as_ref(),
version.as_ref(),
)
@@ -128,20 +137,18 @@ impl SchemaDriver for AppState {
Ok(())
}
+ #[tracing::instrument(skip(self))]
async fn get_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
) -> Result<Option<TransactionSchema>, crate::ConfigurationError> {
+ debug!("getting transaction schema");
let result = sqlx::query_as!(
TransactionSchema,
"select
- type as kind,
- version,
- json_schema as schema,
- created_at,
- updated_at
- from transaction_schema where type = $1 and version = $2",
+ *
+ from transaction_schema where schema_type = $1 and schema_version = $2",
kind.as_ref(),
version.as_ref(),
)
@@ -151,34 +158,71 @@ impl SchemaDriver for AppState {
Ok(result)
}
+ #[tracing::instrument(skip(self, schema))]
async fn update_schema(
&self,
- kind: impl AsRef<str> + Send + Sync,
- version: impl AsRef<str> + Send + Sync,
+ kind: impl AsRef<str> + Send + Sync + Debug,
+ version: impl AsRef<str> + Send + Sync + Debug,
schema: &serde_json::Value,
- ) -> Result<TransactionSchema, crate::ConfigurationError> {
- sqlx::query_as!(TransactionSchema,
- "
+ ) -> Result<Option<TransactionSchema>, crate::ConfigurationError> {
+ debug!("updating transaction schema");
+ sqlx::query_as!(
+ TransactionSchema,
+ "
update
transaction_schema
set
- json_schema = $3
+ schema = $3
where
- type = $1
- and version = $2
- returning
- type as kind,
- version,
- json_schema as schema,
- created_at,
- updated_at
+ schema_type = $1
+ and schema_version = $2
+ returning *
",
kind.as_ref(),
version.as_ref(),
sqlx::types::Json(&schema) as _
)
- .fetch_one(&self.database)
+ .fetch_optional(&self.database)
.await
.map_err(|e| e.into())
}
+
+ #[tracing::instrument(skip(self))]
+ async fn get_schemas(
+ &self,
+ limit: i64,
+ first: Option<i64>,
+ after: Option<impl AsRef<str> + Send + Sync + Debug>,
+ ) -> Result<Vec<TransactionSchema>, ConfigurationError> {
+ debug!("getting transaction schemas");
+ let limit = first.unwrap_or(limit);
+ let mut last_type = String::default();
+ let mut last_version = String::default();
+
+ if let Some(s) = after {
+ let parts: Vec<&str> = s.as_ref().split(',').collect();
+ if parts.len() == 2 {
+ last_type = parts[0].to_string();
+ last_version = parts[1].to_string();
+ }
+ }
+
+ let rows = sqlx::query_as!(
+ TransactionSchema,
+ "
+ select *
+ from transaction_schema
+ where ($1 = '' or (schema_type, schema_version) > ($1, $2))
+ order by schema_type asc, schema_version asc
+ limit $3
+ ",
+ &last_type,
+ &last_version,
+ limit + 1
+ )
+ .fetch_all(&self.database)
+ .await?;
+
+ Ok(rows)
+ }
}