aboutsummaryrefslogtreecommitdiffstats
path: root/crates/configuration/src/state
diff options
context:
space:
mode:
Diffstat (limited to 'crates/configuration/src/state')
-rw-r--r--crates/configuration/src/state/routing.rs3
-rw-r--r--crates/configuration/src/state/routing/mutate_routing.rs150
-rw-r--r--crates/configuration/src/state/routing/query_routing.rs (renamed from crates/configuration/src/state/routing/query.rs)0
3 files changed, 152 insertions, 1 deletions
diff --git a/crates/configuration/src/state/routing.rs b/crates/configuration/src/state/routing.rs
index ea51c17..d71af04 100644
--- a/crates/configuration/src/state/routing.rs
+++ b/crates/configuration/src/state/routing.rs
@@ -1 +1,2 @@
-mod query;
+mod mutate_routing;
+mod query_routing;
diff --git a/crates/configuration/src/state/routing/mutate_routing.rs b/crates/configuration/src/state/routing/mutate_routing.rs
new file mode 100644
index 0000000..0c0637a
--- /dev/null
+++ b/crates/configuration/src/state/routing/mutate_routing.rs
@@ -0,0 +1,150 @@
+use opentelemetry_semantic_conventions::attribute;
+use tonic::{Request, Response, Status, async_trait};
+use tracing::{Instrument, error, info_span, instrument};
+use tracing_opentelemetry::OpenTelemetrySpanExt;
+use uuid::Uuid;
+use warden_core::configuration::{
+ ReloadEvent,
+ routing::{
+ DeleteConfigurationRequest, RoutingConfiguration, UpdateRoutingRequest,
+ mutate_routing_server::MutateRouting,
+ },
+};
+
+use crate::state::{AppHandle, cache_key::CacheKey, invalidate_cache, publish_reload};
+
+#[allow(dead_code)]
+struct RoutingRow {
+ id: Uuid,
+ configuration: sqlx::types::Json<RoutingConfiguration>,
+}
+
+#[async_trait]
+impl MutateRouting for AppHandle {
+ #[instrument(skip(self, request))]
+ async fn create_routing_configuration(
+ &self,
+ request: Request<RoutingConfiguration>,
+ ) -> Result<Response<RoutingConfiguration>, Status> {
+ let request = request.into_inner();
+ let span = info_span!("create.configuration.routing");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "insert");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+
+ sqlx::query!(
+ "insert into routing (id, configuration) values ($1, $2)",
+ Uuid::now_v7(),
+ sqlx::types::Json(&request) as _
+ )
+ .execute(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ Ok(tonic::Response::new(request))
+ }
+
+ async fn update_routing_configuration(
+ &self,
+ request: Request<UpdateRoutingRequest>,
+ ) -> Result<Response<RoutingConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("bad config");
+
+ let request = request.into_inner();
+ let id = Uuid::parse_str(&request.id)
+ .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?;
+
+ let config = request.configuration.expect("configuration to be provided");
+
+ let span = info_span!("update.configuration.routing");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "update");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+
+ let updated = sqlx::query_as!(
+ RoutingRow,
+ r#"
+ update routing
+ set configuration = $1
+ where id = $2
+ returning id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>"
+ "#,
+ sqlx::types::Json(&config) as _,
+ id
+ )
+ .fetch_one(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(self, CacheKey::Routing(&id)),
+ publish_reload(self, conf, ReloadEvent::Routing)
+ )?;
+
+ let res = updated.configuration.0;
+
+ Ok(Response::new(res))
+ }
+
+ async fn delete_routing_configuration(
+ &self,
+ request: Request<DeleteConfigurationRequest>,
+ ) -> Result<Response<RoutingConfiguration>, Status> {
+ let conf = self
+ .app_config
+ .nats
+ .subject
+ .split(".")
+ .next()
+ .expect("bad config");
+
+ let request = request.into_inner();
+ let id = Uuid::parse_str(&request.id)
+ .map_err(|_e| tonic::Status::invalid_argument("id is not a uuid"))?;
+
+ let span = info_span!("delete.configuration.routing");
+ span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
+ span.set_attribute(attribute::DB_OPERATION_NAME, "delete");
+ span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
+
+ let updated = sqlx::query_as!(
+ RoutingRow,
+ r#"
+ delete from routing
+ where id = $1
+ returning id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>"
+ "#,
+ id
+ )
+ .fetch_one(&self.services.postgres)
+ .instrument(span)
+ .await
+ .map_err(|e| {
+ error!("{e}");
+ tonic::Status::internal(e.to_string())
+ })?;
+
+ let (_del_result, _publish_result) = tokio::try_join!(
+ invalidate_cache(self, CacheKey::Routing(&id)),
+ publish_reload(self, conf, ReloadEvent::Routing)
+ )?;
+
+ let res = updated.configuration.0;
+
+ Ok(Response::new(res))
+ }
+}
diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query_routing.rs
index 3c6814d..3c6814d 100644
--- a/crates/configuration/src/state/routing/query.rs
+++ b/crates/configuration/src/state/routing/query_routing.rs