diff options
| -rw-r--r-- | lib/api-config/src/schema/mod.rs | 66 | ||||
| -rw-r--r-- | lib/warden-core/src/lib.rs | 1 | ||||
| -rw-r--r-- | lib/warden-core/src/state/mod.rs | 28 | ||||
| -rw-r--r-- | warden/src/main.rs | 11 | ||||
| -rw-r--r-- | warden/src/server/mod.rs | 3 | ||||
| -rw-r--r-- | warden/src/server/routes/config/logs.rs | 3 | ||||
| -rw-r--r-- | warden/src/server/routes/config/mod.rs | 3 | ||||
| -rw-r--r-- | warden/src/server/routes/config/schema/create.rs | 7 | ||||
| -rw-r--r-- | warden/src/server/routes/config/schema/delete.rs | 6 | ||||
| -rw-r--r-- | warden/src/server/routes/config/schema/mod.rs | 3 | ||||
| -rw-r--r-- | warden/src/server/routes/config/schema/read.rs | 9 | ||||
| -rw-r--r-- | warden/src/server/routes/config/schema/update.rs | 8 | ||||
| -rw-r--r-- | warden/src/server/routes/transaction_monitoring/mod.rs | 3 | ||||
| -rw-r--r-- | warden/src/server/routes/transaction_monitoring/monitor.rs | 5 | ||||
| -rw-r--r-- | warden/src/state/database.rs (renamed from lib/warden-core/src/state/database.rs) | 4 | ||||
| -rw-r--r-- | warden/src/state/mod.rs | 33 |
16 files changed, 102 insertions, 91 deletions
diff --git a/lib/api-config/src/schema/mod.rs b/lib/api-config/src/schema/mod.rs index 7c65d49..f626e87 100644 --- a/lib/api-config/src/schema/mod.rs +++ b/lib/api-config/src/schema/mod.rs @@ -1,12 +1,12 @@ mod create; pub use create::CreateSchema; +use sqlx::PgPool; use tracing::debug; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use time::OffsetDateTime; -use warden_core::state::AppState; use crate::ConfigurationError; @@ -60,31 +60,31 @@ pub struct TransactionSchema { pub updated_at: OffsetDateTime, } +pub struct SchemaService { + pub database: PgPool, +} + #[async_trait] -pub trait SchemaDriver { +pub trait SchemaDriver: Send + Sync { async fn create_schema( &self, - kind: impl AsRef<str> + Send + Sync + Debug, - version: impl AsRef<str> + Send + Sync + Debug, + kind: &str, + version: &str, schema: &serde_json::Value, ) -> Result<TransactionSchema, ConfigurationError>; - async fn delete_schema( - &self, - kind: impl AsRef<str> + Send + Sync + Debug, - version: impl AsRef<str> + Send + Sync + Debug, - ) -> Result<(), ConfigurationError>; + async fn delete_schema(&self, kind: &str, version: &str) -> Result<(), ConfigurationError>; async fn get_schema( &self, - kind: impl AsRef<str> + Send + Sync + Debug, - version: impl AsRef<str> + Send + Sync + Debug, + kind: &str, + version: &str, ) -> Result<Option<TransactionSchema>, ConfigurationError>; async fn update_schema( &self, - kind: impl AsRef<str> + Send + Sync + Debug, - version: impl AsRef<str> + Send + Sync + Debug, + kind: &str, + version: &str, schema: &serde_json::Value, ) -> Result<Option<TransactionSchema>, ConfigurationError>; @@ -92,17 +92,17 @@ pub trait SchemaDriver { &self, limit: i64, first: Option<i64>, - after: Option<impl AsRef<str> + Send + Sync + Debug>, + after: Option<&str>, ) -> Result<Vec<TransactionSchema>, ConfigurationError>; } #[async_trait] -impl SchemaDriver for AppState { +impl SchemaDriver for SchemaService { #[tracing::instrument(skip(self, schema))] async fn create_schema( &self, - kind: impl AsRef<str> + Send + Sync + Debug, - version: impl AsRef<str> + Send + Sync + Debug, + kind: &str, + version: &str, schema: &serde_json::Value, ) -> Result<TransactionSchema, crate::ConfigurationError> { debug!("creating transaction schema"); @@ -111,8 +111,8 @@ impl SchemaDriver for AppState { "insert into transaction_schema (schema_type, schema_version, schema) values ($1, $2, $3) returning * ", - kind.as_ref(), - version.as_ref(), + kind, + version, sqlx::types::Json(&schema) as _ ) .fetch_one(&self.database) @@ -123,14 +123,14 @@ impl SchemaDriver for AppState { #[tracing::instrument(skip(self))] async fn delete_schema( &self, - kind: impl AsRef<str> + Send + Sync + Debug, - version: impl AsRef<str> + Send + Sync + Debug, + kind: &str, + version: &str, ) -> Result<(), crate::ConfigurationError> { debug!("deleting transaction schema"); sqlx::query!( "delete from transaction_schema where schema_type = $1 and schema_version = $2", - kind.as_ref(), - version.as_ref(), + kind, + version, ) .execute(&self.database) .await?; @@ -140,8 +140,8 @@ impl SchemaDriver for AppState { #[tracing::instrument(skip(self))] async fn get_schema( &self, - kind: impl AsRef<str> + Send + Sync + Debug, - version: impl AsRef<str> + Send + Sync + Debug, + kind: &str, + version: &str, ) -> Result<Option<TransactionSchema>, crate::ConfigurationError> { debug!("getting transaction schema"); let result = sqlx::query_as!( @@ -149,8 +149,8 @@ impl SchemaDriver for AppState { "select * from transaction_schema where schema_type = $1 and schema_version = $2", - kind.as_ref(), - version.as_ref(), + kind, + version, ) .fetch_optional(&self.database) .await?; @@ -161,8 +161,8 @@ impl SchemaDriver for AppState { #[tracing::instrument(skip(self, schema))] async fn update_schema( &self, - kind: impl AsRef<str> + Send + Sync + Debug, - version: impl AsRef<str> + Send + Sync + Debug, + kind: &str, + version: &str, schema: &serde_json::Value, ) -> Result<Option<TransactionSchema>, crate::ConfigurationError> { debug!("updating transaction schema"); @@ -178,8 +178,8 @@ impl SchemaDriver for AppState { and schema_version = $2 returning * ", - kind.as_ref(), - version.as_ref(), + kind, + version, sqlx::types::Json(&schema) as _ ) .fetch_optional(&self.database) @@ -192,7 +192,7 @@ impl SchemaDriver for AppState { &self, limit: i64, first: Option<i64>, - after: Option<impl AsRef<str> + Send + Sync + Debug>, + after: Option<&str>, ) -> Result<Vec<TransactionSchema>, ConfigurationError> { debug!("getting transaction schemas"); let limit = first.unwrap_or(limit); @@ -200,7 +200,7 @@ impl SchemaDriver for AppState { let mut last_version = String::default(); if let Some(s) = after { - let parts: Vec<&str> = s.as_ref().split(',').collect(); + let parts: Vec<&str> = s.split(',').collect(); if parts.len() == 2 { last_type = parts[0].to_string(); last_version = parts[1].to_string(); diff --git a/lib/warden-core/src/lib.rs b/lib/warden-core/src/lib.rs index 413087b..f200ba1 100644 --- a/lib/warden-core/src/lib.rs +++ b/lib/warden-core/src/lib.rs @@ -1,4 +1,3 @@ mod error; pub use error::WardenError; pub mod config; -pub mod state; diff --git a/lib/warden-core/src/state/mod.rs b/lib/warden-core/src/state/mod.rs deleted file mode 100644 index 18e44b8..0000000 --- a/lib/warden-core/src/state/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -pub(crate) mod database; -use sqlx::PgPool; -use tracing::{debug, trace}; -use tracing_subscriber::EnvFilter; - -use crate::{WardenError, config::Configuration}; - -pub type LogHandle = tracing_subscriber::reload::Handle<EnvFilter, tracing_subscriber::Registry>; - -#[derive(Debug, Clone)] -pub struct AppState { - pub log_handle: LogHandle, - pub database: PgPool, -} - -impl AppState { - pub async fn new(log_handle: LogHandle, config: &Configuration) -> Result<Self, WardenError> { - let database = database::connect(&config.database).await?; - trace!("running database migrations"); - sqlx::migrate!("../../migrations").run(&database).await?; - debug!("database up to date"); - - Ok(Self { - log_handle, - database, - }) - } -} diff --git a/warden/src/main.rs b/warden/src/main.rs index 7daad57..e155dee 100644 --- a/warden/src/main.rs +++ b/warden/src/main.rs @@ -1,3 +1,4 @@ +use api_config::schema::SchemaService; use std::{ io::Write, net::{Ipv6Addr, SocketAddr}, @@ -8,12 +9,13 @@ use tokio::net::TcpListener; use anyhow::Context as _; use clap::Parser as _; use tracing::info; -use warden_core::{config, state::AppState}; +use warden_core::config; use crate::config::{Commands, Configuration}; mod logging; mod server; +mod state; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -35,7 +37,12 @@ async fn main() -> anyhow::Result<()> { let (log_handle, _guard) = logging::initialise_logging(&config.server.log_level, &config.server.log_dir); - let state = AppState::new(log_handle, &config).await?; + let schema = SchemaService { + database: state::database::connect(&config.database).await?, + }; + let schema = Arc::new(schema); + + let state = state::AppState::new(log_handle, schema).await?; let app = server::router(Arc::new(state), &config).await; let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.server.port)); diff --git a/warden/src/server/mod.rs b/warden/src/server/mod.rs index c9bab23..8c68ed8 100644 --- a/warden/src/server/mod.rs +++ b/warden/src/server/mod.rs @@ -13,14 +13,13 @@ use tower_http::{ }; use utoipa::OpenApi; use utoipa_axum::router::OpenApiRouter; -use warden_core::state::AppState; use crate::{ config::Configuration, server::{ middleware::request_id::{REQUEST_ID_HEADER, middleware_request_id}, routes::{ApiDoc, config::ConfigDoc, transaction_monitoring::MonitoringDoc}, - }, + }, state::AppState, }; pub mod api; diff --git a/warden/src/server/routes/config/logs.rs b/warden/src/server/routes/config/logs.rs index 67bd14b..9e8b2a6 100644 --- a/warden/src/server/routes/config/logs.rs +++ b/warden/src/server/routes/config/logs.rs @@ -3,9 +3,8 @@ use std::sync::Arc; use axum::{extract::State, http::StatusCode}; use serde::Deserialize; use utoipa::ToSchema; -use warden_core::state::AppState; -use crate::server::{api::SafeJson, routes::config::CONFIG}; +use crate::{server::{api::SafeJson, routes::config::CONFIG}, state::AppState}; #[derive(Deserialize, Debug, Clone, ToSchema)] /// Log level diff --git a/warden/src/server/routes/config/mod.rs b/warden/src/server/routes/config/mod.rs index d5344f9..5a16be9 100644 --- a/warden/src/server/routes/config/mod.rs +++ b/warden/src/server/routes/config/mod.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use utoipa::OpenApi; use utoipa_axum::router::OpenApiRouter; -use warden_core::state::AppState; + +use crate::state::AppState; const CONFIG: &str = "Configuration"; diff --git a/warden/src/server/routes/config/schema/create.rs b/warden/src/server/routes/config/schema/create.rs index b2d3051..d864f32 100644 --- a/warden/src/server/routes/config/schema/create.rs +++ b/warden/src/server/routes/config/schema/create.rs @@ -8,13 +8,12 @@ use axum::{ response::IntoResponse, }; use tracing::{info, trace}; -use warden_core::state::AppState; -use crate::server::{ +use crate::{server::{ api::version::{Version, VersionPath}, error::AppError, routes::config::CONFIG, -}; +}, state::AppState}; /// Save a transaction's schema #[utoipa::path( @@ -91,7 +90,7 @@ pub async fn create_schema( })?; info!("schema is valid. trying to save..."); - let result = state + let result = state.schema_service .create_schema(&body.schema_type, &body.schema_version, &body.schema) .await .map_err(|e| match e { diff --git a/warden/src/server/routes/config/schema/delete.rs b/warden/src/server/routes/config/schema/delete.rs index a761203..8fcc807 100644 --- a/warden/src/server/routes/config/schema/delete.rs +++ b/warden/src/server/routes/config/schema/delete.rs @@ -9,13 +9,12 @@ use axum::{ }; use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; -use warden_core::state::AppState; -use crate::server::{ +use crate::{server::{ api::version::{Version, VersionPath}, error::AppError, routes::config::CONFIG, -}; +}, state::AppState}; /// Schema delete query #[derive(Deserialize, Serialize, IntoParams, ToSchema)] @@ -86,6 +85,7 @@ pub async fn delete_schema( ) -> Result<impl IntoResponse, AppError> { // TODO: should also clear cached ones eventually state + .schema_service .delete_schema(&body.schema_type, &body.schema_version) .await?; Ok(StatusCode::NO_CONTENT) diff --git a/warden/src/server/routes/config/schema/mod.rs b/warden/src/server/routes/config/schema/mod.rs index 3fbdb81..8c84f10 100644 --- a/warden/src/server/routes/config/schema/mod.rs +++ b/warden/src/server/routes/config/schema/mod.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use utoipa_axum::router::OpenApiRouter; -use warden_core::state::AppState; + +use crate::state::AppState; pub mod create; pub mod delete; diff --git a/warden/src/server/routes/config/schema/read.rs b/warden/src/server/routes/config/schema/read.rs index 1e87626..a80f88f 100644 --- a/warden/src/server/routes/config/schema/read.rs +++ b/warden/src/server/routes/config/schema/read.rs @@ -9,16 +9,15 @@ use axum::{ }; use base64::{Engine, engine::general_purpose}; use tracing::debug; -use warden_core::state::AppState; -use crate::server::{ +use crate::{server::{ api::{ pagination::{PageInfo, PaginationParams, RelayResponse}, version::{Version, VersionPath}, }, error::AppError, routes::config::CONFIG, -}; +}, state::AppState}; /// Get a transaction's schema #[utoipa::path( @@ -89,6 +88,7 @@ pub async fn get_schema( debug!("searching for schema"); // TODO: get from cache let result = state + .schema_service .get_schema(&schema_type, &schema_version) .await .map_err(|e| match e { @@ -186,7 +186,8 @@ pub async fn get_schemas( // TODO: get from cache let rows = state - .get_schemas(limit, params.first, cursor) + .schema_service + .get_schemas(limit, params.first, cursor.as_deref()) .await .map_err(|e| match e { api_config::ConfigurationError::Database(ref error) => match error { diff --git a/warden/src/server/routes/config/schema/update.rs b/warden/src/server/routes/config/schema/update.rs index f116e0f..aaf1773 100644 --- a/warden/src/server/routes/config/schema/update.rs +++ b/warden/src/server/routes/config/schema/update.rs @@ -7,13 +7,12 @@ use axum::{ http::{HeaderMap, StatusCode}, response::IntoResponse, }; -use warden_core::state::AppState; -use crate::server::{ +use crate::{server::{ api::version::{Version, VersionPath}, error::AppError, routes::config::CONFIG, -}; +}, state::AppState}; /// Update a transaction's schema #[utoipa::path( @@ -86,7 +85,8 @@ pub async fn update_schema( ) -> Result<impl IntoResponse, AppError> { // TODO: should also clear cached ones eventually let result = state - .update_schema(schema_type, schema_version, &body) + .schema_service + .update_schema(&schema_type, &schema_version, &body) .await?; if let Some(result) = result { diff --git a/warden/src/server/routes/transaction_monitoring/mod.rs b/warden/src/server/routes/transaction_monitoring/mod.rs index 459d75d..da923d4 100644 --- a/warden/src/server/routes/transaction_monitoring/mod.rs +++ b/warden/src/server/routes/transaction_monitoring/mod.rs @@ -2,7 +2,8 @@ use std::sync::Arc; use utoipa::OpenApi; use utoipa_axum::router::OpenApiRouter; -use warden_core::state::AppState; + +use crate::state::AppState; pub mod monitor; diff --git a/warden/src/server/routes/transaction_monitoring/monitor.rs b/warden/src/server/routes/transaction_monitoring/monitor.rs index 817dbca..dedb6b1 100644 --- a/warden/src/server/routes/transaction_monitoring/monitor.rs +++ b/warden/src/server/routes/transaction_monitoring/monitor.rs @@ -1,17 +1,16 @@ use std::sync::Arc; -use crate::server::{ +use crate::{server::{ api::{ transaction::Transaction, version::{Version, VersionPath}, }, middleware::extractors::transaction::ValidatedTransaction, -}; +}, state::AppState}; use axum::{ extract::{Path, State}, http::StatusCode, }; -use warden_core::state::AppState; /// Submit a transaction for monitoring #[utoipa::path( diff --git a/lib/warden-core/src/state/database.rs b/warden/src/state/database.rs index 4167424..cc79f13 100644 --- a/lib/warden-core/src/state/database.rs +++ b/warden/src/state/database.rs @@ -1,9 +1,9 @@ use sqlx::PgPool; use tracing::{debug, error}; -use crate::{WardenError, config::cli::database::Database}; +use crate::config::cli::database::Database; -pub(crate) async fn connect(config: &Database) -> Result<PgPool, WardenError> { +pub(crate) async fn connect(config: &Database) -> anyhow::Result<PgPool> { let url = config.get_url()?; let host = url.host_str(); debug!(host = host, "connecting to database"); diff --git a/warden/src/state/mod.rs b/warden/src/state/mod.rs new file mode 100644 index 0000000..7dfcdc4 --- /dev/null +++ b/warden/src/state/mod.rs @@ -0,0 +1,33 @@ +pub(crate) mod database; +use std::sync::Arc; + +use api_config::schema::SchemaDriver; +use tracing::{debug, trace}; +use tracing_subscriber::EnvFilter; + +use crate::config::Configuration; + +pub type LogHandle = tracing_subscriber::reload::Handle<EnvFilter, tracing_subscriber::Registry>; + +#[derive(Clone)] +pub struct AppState { + pub log_handle: LogHandle, + pub schema_service: Arc<dyn SchemaDriver>, +} + +impl AppState { + pub async fn new( + log_handle: LogHandle, + schema_service: Arc<dyn SchemaDriver>, + ) -> anyhow::Result<Self> { + // let database = database::connect(&config.database).await?; + // trace!("running database migrations"); + // sqlx::migrate!("../../../migrations").run(&database).await?; + // debug!("database up to date"); + + Ok(Self { + log_handle, + schema_service, + }) + } +} |
