aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2026-03-30 17:46:25 +0200
committerrtkay123 <dev@kanjala.com>2026-03-30 17:46:25 +0200
commitcec58d78e968250e4c589899eab460d1132f6d01 (patch)
tree53fcd3e1781ebf337a6ce56ca726cd91457e258c
parent4071482f983d66b16cc8a5519f5665990dc7bc02 (diff)
downloadwarden-cec58d78e968250e4c589899eab460d1132f6d01.tar.bz2
warden-cec58d78e968250e4c589899eab460d1132f6d01.zip
refactor: generic svcs
-rw-r--r--lib/api-config/src/schema/mod.rs66
-rw-r--r--lib/warden-core/src/lib.rs1
-rw-r--r--lib/warden-core/src/state/mod.rs28
-rw-r--r--warden/src/main.rs11
-rw-r--r--warden/src/server/mod.rs3
-rw-r--r--warden/src/server/routes/config/logs.rs3
-rw-r--r--warden/src/server/routes/config/mod.rs3
-rw-r--r--warden/src/server/routes/config/schema/create.rs7
-rw-r--r--warden/src/server/routes/config/schema/delete.rs6
-rw-r--r--warden/src/server/routes/config/schema/mod.rs3
-rw-r--r--warden/src/server/routes/config/schema/read.rs9
-rw-r--r--warden/src/server/routes/config/schema/update.rs8
-rw-r--r--warden/src/server/routes/transaction_monitoring/mod.rs3
-rw-r--r--warden/src/server/routes/transaction_monitoring/monitor.rs5
-rw-r--r--warden/src/state/database.rs (renamed from lib/warden-core/src/state/database.rs)4
-rw-r--r--warden/src/state/mod.rs33
16 files changed, 102 insertions, 91 deletions
diff --git a/lib/api-config/src/schema/mod.rs b/lib/api-config/src/schema/mod.rs
index 7c65d49..f626e87 100644
--- a/lib/api-config/src/schema/mod.rs
+++ b/lib/api-config/src/schema/mod.rs
@@ -1,12 +1,12 @@
mod create;
pub use create::CreateSchema;
+use sqlx::PgPool;
use tracing::debug;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use time::OffsetDateTime;
-use warden_core::state::AppState;
use crate::ConfigurationError;
@@ -60,31 +60,31 @@ pub struct TransactionSchema {
pub updated_at: OffsetDateTime,
}
+pub struct SchemaService {
+ pub database: PgPool,
+}
+
#[async_trait]
-pub trait SchemaDriver {
+pub trait SchemaDriver: Send + Sync {
async fn create_schema(
&self,
- kind: impl AsRef<str> + Send + Sync + Debug,
- version: impl AsRef<str> + Send + Sync + Debug,
+ kind: &str,
+ version: &str,
schema: &serde_json::Value,
) -> Result<TransactionSchema, ConfigurationError>;
- async fn delete_schema(
- &self,
- kind: impl AsRef<str> + Send + Sync + Debug,
- version: impl AsRef<str> + Send + Sync + Debug,
- ) -> Result<(), ConfigurationError>;
+ async fn delete_schema(&self, kind: &str, version: &str) -> Result<(), ConfigurationError>;
async fn get_schema(
&self,
- kind: impl AsRef<str> + Send + Sync + Debug,
- version: impl AsRef<str> + Send + Sync + Debug,
+ kind: &str,
+ version: &str,
) -> Result<Option<TransactionSchema>, ConfigurationError>;
async fn update_schema(
&self,
- kind: impl AsRef<str> + Send + Sync + Debug,
- version: impl AsRef<str> + Send + Sync + Debug,
+ kind: &str,
+ version: &str,
schema: &serde_json::Value,
) -> Result<Option<TransactionSchema>, ConfigurationError>;
@@ -92,17 +92,17 @@ pub trait SchemaDriver {
&self,
limit: i64,
first: Option<i64>,
- after: Option<impl AsRef<str> + Send + Sync + Debug>,
+ after: Option<&str>,
) -> Result<Vec<TransactionSchema>, ConfigurationError>;
}
#[async_trait]
-impl SchemaDriver for AppState {
+impl SchemaDriver for SchemaService {
#[tracing::instrument(skip(self, schema))]
async fn create_schema(
&self,
- kind: impl AsRef<str> + Send + Sync + Debug,
- version: impl AsRef<str> + Send + Sync + Debug,
+ kind: &str,
+ version: &str,
schema: &serde_json::Value,
) -> Result<TransactionSchema, crate::ConfigurationError> {
debug!("creating transaction schema");
@@ -111,8 +111,8 @@ impl SchemaDriver for AppState {
"insert into transaction_schema (schema_type, schema_version, schema) values ($1, $2, $3)
returning *
",
- kind.as_ref(),
- version.as_ref(),
+ kind,
+ version,
sqlx::types::Json(&schema) as _
)
.fetch_one(&self.database)
@@ -123,14 +123,14 @@ impl SchemaDriver for AppState {
#[tracing::instrument(skip(self))]
async fn delete_schema(
&self,
- kind: impl AsRef<str> + Send + Sync + Debug,
- version: impl AsRef<str> + Send + Sync + Debug,
+ kind: &str,
+ version: &str,
) -> Result<(), crate::ConfigurationError> {
debug!("deleting transaction schema");
sqlx::query!(
"delete from transaction_schema where schema_type = $1 and schema_version = $2",
- kind.as_ref(),
- version.as_ref(),
+ kind,
+ version,
)
.execute(&self.database)
.await?;
@@ -140,8 +140,8 @@ impl SchemaDriver for AppState {
#[tracing::instrument(skip(self))]
async fn get_schema(
&self,
- kind: impl AsRef<str> + Send + Sync + Debug,
- version: impl AsRef<str> + Send + Sync + Debug,
+ kind: &str,
+ version: &str,
) -> Result<Option<TransactionSchema>, crate::ConfigurationError> {
debug!("getting transaction schema");
let result = sqlx::query_as!(
@@ -149,8 +149,8 @@ impl SchemaDriver for AppState {
"select
*
from transaction_schema where schema_type = $1 and schema_version = $2",
- kind.as_ref(),
- version.as_ref(),
+ kind,
+ version,
)
.fetch_optional(&self.database)
.await?;
@@ -161,8 +161,8 @@ impl SchemaDriver for AppState {
#[tracing::instrument(skip(self, schema))]
async fn update_schema(
&self,
- kind: impl AsRef<str> + Send + Sync + Debug,
- version: impl AsRef<str> + Send + Sync + Debug,
+ kind: &str,
+ version: &str,
schema: &serde_json::Value,
) -> Result<Option<TransactionSchema>, crate::ConfigurationError> {
debug!("updating transaction schema");
@@ -178,8 +178,8 @@ impl SchemaDriver for AppState {
and schema_version = $2
returning *
",
- kind.as_ref(),
- version.as_ref(),
+ kind,
+ version,
sqlx::types::Json(&schema) as _
)
.fetch_optional(&self.database)
@@ -192,7 +192,7 @@ impl SchemaDriver for AppState {
&self,
limit: i64,
first: Option<i64>,
- after: Option<impl AsRef<str> + Send + Sync + Debug>,
+ after: Option<&str>,
) -> Result<Vec<TransactionSchema>, ConfigurationError> {
debug!("getting transaction schemas");
let limit = first.unwrap_or(limit);
@@ -200,7 +200,7 @@ impl SchemaDriver for AppState {
let mut last_version = String::default();
if let Some(s) = after {
- let parts: Vec<&str> = s.as_ref().split(',').collect();
+ let parts: Vec<&str> = s.split(',').collect();
if parts.len() == 2 {
last_type = parts[0].to_string();
last_version = parts[1].to_string();
diff --git a/lib/warden-core/src/lib.rs b/lib/warden-core/src/lib.rs
index 413087b..f200ba1 100644
--- a/lib/warden-core/src/lib.rs
+++ b/lib/warden-core/src/lib.rs
@@ -1,4 +1,3 @@
mod error;
pub use error::WardenError;
pub mod config;
-pub mod state;
diff --git a/lib/warden-core/src/state/mod.rs b/lib/warden-core/src/state/mod.rs
deleted file mode 100644
index 18e44b8..0000000
--- a/lib/warden-core/src/state/mod.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-pub(crate) mod database;
-use sqlx::PgPool;
-use tracing::{debug, trace};
-use tracing_subscriber::EnvFilter;
-
-use crate::{WardenError, config::Configuration};
-
-pub type LogHandle = tracing_subscriber::reload::Handle<EnvFilter, tracing_subscriber::Registry>;
-
-#[derive(Debug, Clone)]
-pub struct AppState {
- pub log_handle: LogHandle,
- pub database: PgPool,
-}
-
-impl AppState {
- pub async fn new(log_handle: LogHandle, config: &Configuration) -> Result<Self, WardenError> {
- let database = database::connect(&config.database).await?;
- trace!("running database migrations");
- sqlx::migrate!("../../migrations").run(&database).await?;
- debug!("database up to date");
-
- Ok(Self {
- log_handle,
- database,
- })
- }
-}
diff --git a/warden/src/main.rs b/warden/src/main.rs
index 7daad57..e155dee 100644
--- a/warden/src/main.rs
+++ b/warden/src/main.rs
@@ -1,3 +1,4 @@
+use api_config::schema::SchemaService;
use std::{
io::Write,
net::{Ipv6Addr, SocketAddr},
@@ -8,12 +9,13 @@ use tokio::net::TcpListener;
use anyhow::Context as _;
use clap::Parser as _;
use tracing::info;
-use warden_core::{config, state::AppState};
+use warden_core::config;
use crate::config::{Commands, Configuration};
mod logging;
mod server;
+mod state;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@@ -35,7 +37,12 @@ async fn main() -> anyhow::Result<()> {
let (log_handle, _guard) =
logging::initialise_logging(&config.server.log_level, &config.server.log_dir);
- let state = AppState::new(log_handle, &config).await?;
+ let schema = SchemaService {
+ database: state::database::connect(&config.database).await?,
+ };
+ let schema = Arc::new(schema);
+
+ let state = state::AppState::new(log_handle, schema).await?;
let app = server::router(Arc::new(state), &config).await;
let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.server.port));
diff --git a/warden/src/server/mod.rs b/warden/src/server/mod.rs
index c9bab23..8c68ed8 100644
--- a/warden/src/server/mod.rs
+++ b/warden/src/server/mod.rs
@@ -13,14 +13,13 @@ use tower_http::{
};
use utoipa::OpenApi;
use utoipa_axum::router::OpenApiRouter;
-use warden_core::state::AppState;
use crate::{
config::Configuration,
server::{
middleware::request_id::{REQUEST_ID_HEADER, middleware_request_id},
routes::{ApiDoc, config::ConfigDoc, transaction_monitoring::MonitoringDoc},
- },
+ }, state::AppState,
};
pub mod api;
diff --git a/warden/src/server/routes/config/logs.rs b/warden/src/server/routes/config/logs.rs
index 67bd14b..9e8b2a6 100644
--- a/warden/src/server/routes/config/logs.rs
+++ b/warden/src/server/routes/config/logs.rs
@@ -3,9 +3,8 @@ use std::sync::Arc;
use axum::{extract::State, http::StatusCode};
use serde::Deserialize;
use utoipa::ToSchema;
-use warden_core::state::AppState;
-use crate::server::{api::SafeJson, routes::config::CONFIG};
+use crate::{server::{api::SafeJson, routes::config::CONFIG}, state::AppState};
#[derive(Deserialize, Debug, Clone, ToSchema)]
/// Log level
diff --git a/warden/src/server/routes/config/mod.rs b/warden/src/server/routes/config/mod.rs
index d5344f9..5a16be9 100644
--- a/warden/src/server/routes/config/mod.rs
+++ b/warden/src/server/routes/config/mod.rs
@@ -4,7 +4,8 @@ use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_axum::router::OpenApiRouter;
-use warden_core::state::AppState;
+
+use crate::state::AppState;
const CONFIG: &str = "Configuration";
diff --git a/warden/src/server/routes/config/schema/create.rs b/warden/src/server/routes/config/schema/create.rs
index b2d3051..d864f32 100644
--- a/warden/src/server/routes/config/schema/create.rs
+++ b/warden/src/server/routes/config/schema/create.rs
@@ -8,13 +8,12 @@ use axum::{
response::IntoResponse,
};
use tracing::{info, trace};
-use warden_core::state::AppState;
-use crate::server::{
+use crate::{server::{
api::version::{Version, VersionPath},
error::AppError,
routes::config::CONFIG,
-};
+}, state::AppState};
/// Save a transaction's schema
#[utoipa::path(
@@ -91,7 +90,7 @@ pub async fn create_schema(
})?;
info!("schema is valid. trying to save...");
- let result = state
+ let result = state.schema_service
.create_schema(&body.schema_type, &body.schema_version, &body.schema)
.await
.map_err(|e| match e {
diff --git a/warden/src/server/routes/config/schema/delete.rs b/warden/src/server/routes/config/schema/delete.rs
index a761203..8fcc807 100644
--- a/warden/src/server/routes/config/schema/delete.rs
+++ b/warden/src/server/routes/config/schema/delete.rs
@@ -9,13 +9,12 @@ use axum::{
};
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};
-use warden_core::state::AppState;
-use crate::server::{
+use crate::{server::{
api::version::{Version, VersionPath},
error::AppError,
routes::config::CONFIG,
-};
+}, state::AppState};
/// Schema delete query
#[derive(Deserialize, Serialize, IntoParams, ToSchema)]
@@ -86,6 +85,7 @@ pub async fn delete_schema(
) -> Result<impl IntoResponse, AppError> {
// TODO: should also clear cached ones eventually
state
+ .schema_service
.delete_schema(&body.schema_type, &body.schema_version)
.await?;
Ok(StatusCode::NO_CONTENT)
diff --git a/warden/src/server/routes/config/schema/mod.rs b/warden/src/server/routes/config/schema/mod.rs
index 3fbdb81..8c84f10 100644
--- a/warden/src/server/routes/config/schema/mod.rs
+++ b/warden/src/server/routes/config/schema/mod.rs
@@ -1,7 +1,8 @@
use std::sync::Arc;
use utoipa_axum::router::OpenApiRouter;
-use warden_core::state::AppState;
+
+use crate::state::AppState;
pub mod create;
pub mod delete;
diff --git a/warden/src/server/routes/config/schema/read.rs b/warden/src/server/routes/config/schema/read.rs
index 1e87626..a80f88f 100644
--- a/warden/src/server/routes/config/schema/read.rs
+++ b/warden/src/server/routes/config/schema/read.rs
@@ -9,16 +9,15 @@ use axum::{
};
use base64::{Engine, engine::general_purpose};
use tracing::debug;
-use warden_core::state::AppState;
-use crate::server::{
+use crate::{server::{
api::{
pagination::{PageInfo, PaginationParams, RelayResponse},
version::{Version, VersionPath},
},
error::AppError,
routes::config::CONFIG,
-};
+}, state::AppState};
/// Get a transaction's schema
#[utoipa::path(
@@ -89,6 +88,7 @@ pub async fn get_schema(
debug!("searching for schema");
// TODO: get from cache
let result = state
+ .schema_service
.get_schema(&schema_type, &schema_version)
.await
.map_err(|e| match e {
@@ -186,7 +186,8 @@ pub async fn get_schemas(
// TODO: get from cache
let rows = state
- .get_schemas(limit, params.first, cursor)
+ .schema_service
+ .get_schemas(limit, params.first, cursor.as_deref())
.await
.map_err(|e| match e {
api_config::ConfigurationError::Database(ref error) => match error {
diff --git a/warden/src/server/routes/config/schema/update.rs b/warden/src/server/routes/config/schema/update.rs
index f116e0f..aaf1773 100644
--- a/warden/src/server/routes/config/schema/update.rs
+++ b/warden/src/server/routes/config/schema/update.rs
@@ -7,13 +7,12 @@ use axum::{
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
-use warden_core::state::AppState;
-use crate::server::{
+use crate::{server::{
api::version::{Version, VersionPath},
error::AppError,
routes::config::CONFIG,
-};
+}, state::AppState};
/// Update a transaction's schema
#[utoipa::path(
@@ -86,7 +85,8 @@ pub async fn update_schema(
) -> Result<impl IntoResponse, AppError> {
// TODO: should also clear cached ones eventually
let result = state
- .update_schema(schema_type, schema_version, &body)
+ .schema_service
+ .update_schema(&schema_type, &schema_version, &body)
.await?;
if let Some(result) = result {
diff --git a/warden/src/server/routes/transaction_monitoring/mod.rs b/warden/src/server/routes/transaction_monitoring/mod.rs
index 459d75d..da923d4 100644
--- a/warden/src/server/routes/transaction_monitoring/mod.rs
+++ b/warden/src/server/routes/transaction_monitoring/mod.rs
@@ -2,7 +2,8 @@ use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_axum::router::OpenApiRouter;
-use warden_core::state::AppState;
+
+use crate::state::AppState;
pub mod monitor;
diff --git a/warden/src/server/routes/transaction_monitoring/monitor.rs b/warden/src/server/routes/transaction_monitoring/monitor.rs
index 817dbca..dedb6b1 100644
--- a/warden/src/server/routes/transaction_monitoring/monitor.rs
+++ b/warden/src/server/routes/transaction_monitoring/monitor.rs
@@ -1,17 +1,16 @@
use std::sync::Arc;
-use crate::server::{
+use crate::{server::{
api::{
transaction::Transaction,
version::{Version, VersionPath},
},
middleware::extractors::transaction::ValidatedTransaction,
-};
+}, state::AppState};
use axum::{
extract::{Path, State},
http::StatusCode,
};
-use warden_core::state::AppState;
/// Submit a transaction for monitoring
#[utoipa::path(
diff --git a/lib/warden-core/src/state/database.rs b/warden/src/state/database.rs
index 4167424..cc79f13 100644
--- a/lib/warden-core/src/state/database.rs
+++ b/warden/src/state/database.rs
@@ -1,9 +1,9 @@
use sqlx::PgPool;
use tracing::{debug, error};
-use crate::{WardenError, config::cli::database::Database};
+use crate::config::cli::database::Database;
-pub(crate) async fn connect(config: &Database) -> Result<PgPool, WardenError> {
+pub(crate) async fn connect(config: &Database) -> anyhow::Result<PgPool> {
let url = config.get_url()?;
let host = url.host_str();
debug!(host = host, "connecting to database");
diff --git a/warden/src/state/mod.rs b/warden/src/state/mod.rs
new file mode 100644
index 0000000..7dfcdc4
--- /dev/null
+++ b/warden/src/state/mod.rs
@@ -0,0 +1,33 @@
+pub(crate) mod database;
+use std::sync::Arc;
+
+use api_config::schema::SchemaDriver;
+use tracing::{debug, trace};
+use tracing_subscriber::EnvFilter;
+
+use crate::config::Configuration;
+
+pub type LogHandle = tracing_subscriber::reload::Handle<EnvFilter, tracing_subscriber::Registry>;
+
+#[derive(Clone)]
+pub struct AppState {
+ pub log_handle: LogHandle,
+ pub schema_service: Arc<dyn SchemaDriver>,
+}
+
+impl AppState {
+ pub async fn new(
+ log_handle: LogHandle,
+ schema_service: Arc<dyn SchemaDriver>,
+ ) -> anyhow::Result<Self> {
+ // let database = database::connect(&config.database).await?;
+ // trace!("running database migrations");
+ // sqlx::migrate!("../../../migrations").run(&database).await?;
+ // debug!("database up to date");
+
+ Ok(Self {
+ log_handle,
+ schema_service,
+ })
+ }
+}