aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-15 19:36:22 +0200
committerrtkay123 <dev@kanjala.com>2025-08-15 19:36:22 +0200
commit1968002d656383069a386bd874c9f0cc83e3116e (patch)
tree3f37092facf20b1176313428ee6269878529278f
parentf5ba1a25cad80bff8c6e01f8d956e212be097ae7 (diff)
downloadwarden-1968002d656383069a386bd874c9f0cc83e3116e.tar.bz2
warden-1968002d656383069a386bd874c9f0cc83e3116e.zip
feat(rule-exec): receive messages
-rw-r--r--Cargo.lock29
-rw-r--r--Cargo.toml1
-rw-r--r--crates/configuration/src/server.rs3
-rw-r--r--crates/configuration/src/server/grpc_svc.rs24
-rw-r--r--crates/router/Cargo.toml3
-rw-r--r--crates/router/src/processor.rs26
-rw-r--r--crates/router/src/processor/grpc.rs27
-rw-r--r--crates/router/src/state.rs6
-rw-r--r--crates/rule-executor/Cargo.toml45
-rw-r--r--crates/rule-executor/rule-executor.toml33
-rw-r--r--crates/rule-executor/src/cnfg.rs26
-rw-r--r--crates/rule-executor/src/main.rs60
-rw-r--r--crates/rule-executor/src/processor.rs114
-rw-r--r--crates/rule-executor/src/processor/publish.rs1
-rw-r--r--crates/rule-executor/src/processor/reload.rs59
-rw-r--r--crates/rule-executor/src/processor/rule.rs1
-rw-r--r--crates/rule-executor/src/state.rs54
-rw-r--r--crates/warden/src/server.rs1
-rw-r--r--crates/warden/src/state.rs7
-rw-r--r--lib/warden-core/src/configuration/conv.rs2
-rw-r--r--lib/warden-middleware/Cargo.toml2
-rw-r--r--lib/warden-middleware/src/grpc.rs (renamed from crates/warden/src/server/grpc.rs)0
-rw-r--r--lib/warden-middleware/src/lib.rs1
23 files changed, 453 insertions, 72 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 4fcbeae..9410446 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2449,6 +2449,32 @@ dependencies = [
]
[[package]]
+name = "rule-executor"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "async-nats",
+ "clap",
+ "config",
+ "futures-util",
+ "moka",
+ "opentelemetry",
+ "opentelemetry-semantic-conventions",
+ "prost 0.14.1",
+ "serde",
+ "serde_json",
+ "time",
+ "tokio",
+ "tonic 0.14.1",
+ "tracing",
+ "tracing-opentelemetry",
+ "uuid",
+ "warden-core",
+ "warden-middleware",
+ "warden-stack",
+]
+
+[[package]]
name = "rust-embed"
version = "8.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3994,8 +4020,10 @@ dependencies = [
"axum",
"metrics",
"metrics-exporter-prometheus",
+ "tonic 0.14.1",
"tower-http",
"tracing",
+ "warden-stack",
]
[[package]]
@@ -4040,6 +4068,7 @@ dependencies = [
"tracing-opentelemetry",
"uuid",
"warden-core",
+ "warden-middleware",
"warden-stack",
]
diff --git a/Cargo.toml b/Cargo.toml
index fee837d..3da737b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,6 +18,7 @@ config = { version = "0.15.13", default-features = false }
futures-util = { version = "0.3.31", default-features = false }
metrics = { version = "0.24.2", default-features = false }
metrics-exporter-prometheus = { version = "0.17.2", default-features = false }
+moka = "0.12.10"
opentelemetry = { version = "0.30.0", default-features = false }
opentelemetry-http = "0.30.0"
opentelemetry-otlp = { version = "0.30.0", default-features = false }
diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs
index 28d6dd8..e31fc60 100644
--- a/crates/configuration/src/server.rs
+++ b/crates/configuration/src/server.rs
@@ -1,10 +1,8 @@
pub mod error;
-pub mod grpc_svc;
mod http_svc;
pub mod reload_stream;
mod version;
-use grpc_svc::interceptor::MyInterceptor;
use http_svc::build_router;
use tonic::service::Routes;
use tower_http::trace::TraceLayer;
@@ -20,6 +18,7 @@ use warden_core::{
},
},
};
+use warden_middleware::grpc::interceptor::MyInterceptor;
use crate::{server::error::AppError, state::AppHandle};
diff --git a/crates/configuration/src/server/grpc_svc.rs b/crates/configuration/src/server/grpc_svc.rs
deleted file mode 100644
index 42aa871..0000000
--- a/crates/configuration/src/server/grpc_svc.rs
+++ /dev/null
@@ -1,24 +0,0 @@
-pub mod interceptor {
- use opentelemetry::global;
- use tonic::{Status, service::Interceptor};
- use tracing::Span;
- use tracing_opentelemetry::OpenTelemetrySpanExt;
- use warden_stack::tracing::telemetry::tonic::extractor;
-
- #[derive(Clone, Copy)]
- pub struct MyInterceptor;
-
- impl Interceptor for MyInterceptor {
- fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
- let span = Span::current();
-
- let cx = global::get_text_map_propagator(|propagator| {
- propagator.extract(&extractor::MetadataMap(request.metadata()))
- });
-
- span.set_parent(cx);
-
- Ok(request)
- }
- }
-}
diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml
index 0b7b232..08ccf71 100644
--- a/crates/router/Cargo.toml
+++ b/crates/router/Cargo.toml
@@ -14,7 +14,7 @@ bytes = "1.10.1"
clap = { workspace = true, features = ["derive"] }
config = { workspace = true, features = ["convert-case", "toml"] }
futures-util.workspace = true
-moka = { version = "0.12.10", features = ["future"] }
+moka = { workspace = true, features = ["future"] }
opentelemetry.workspace = true
opentelemetry-semantic-conventions.workspace = true
prost.workspace = true
@@ -34,6 +34,7 @@ warden-core = { workspace = true, features = [
"configuration",
"serde-time"
] }
+warden-middleware.workspace = true
[dependencies.warden-stack]
workspace = true
diff --git a/crates/router/src/processor.rs b/crates/router/src/processor.rs
index 9afe726..c593393 100644
--- a/crates/router/src/processor.rs
+++ b/crates/router/src/processor.rs
@@ -1,4 +1,3 @@
-pub mod grpc;
mod load;
mod publish;
mod reload;
@@ -11,7 +10,7 @@ use async_nats::jetstream::{
Context,
consumer::{Consumer, pull},
};
-use futures_util::StreamExt;
+use futures_util::{StreamExt, future};
use tokio::signal;
use tracing::{error, trace};
use warden_stack::{Configuration, tracing::SdkTracerProvider};
@@ -46,11 +45,24 @@ async fn run(state: AppHandle) -> anyhow::Result<()> {
let consumer = consumer?;
// Consume messages from the consumer
- while let Some(Ok(message)) = consumer.messages().await?.next().await {
- if let Err(e) = route::route(message, Arc::clone(&state)).await {
- error!("{}", e.to_string());
- }
- }
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = Arc::clone(&state);
+ tokio::spawn(async move {
+ if let Ok(message) = message
+ && let Err(e) = route::route(message, Arc::clone(&state)).await
+ {
+ error!("{}", e.to_string());
+ }
+ });
+ future::ready(())
+ })
+ .await;
Ok(())
}
diff --git a/crates/router/src/processor/grpc.rs b/crates/router/src/processor/grpc.rs
deleted file mode 100644
index 344f2a1..0000000
--- a/crates/router/src/processor/grpc.rs
+++ /dev/null
@@ -1,27 +0,0 @@
-pub mod interceptor {
- use opentelemetry::global;
- use tonic::{
- Status,
- service::{Interceptor, interceptor::InterceptedService},
- transport::Channel,
- };
- use tracing::Span;
- use tracing_opentelemetry::OpenTelemetrySpanExt;
- use warden_stack::tracing::telemetry::tonic::injector;
-
- pub type Intercepted = InterceptedService<Channel, MyInterceptor>;
-
- #[derive(Clone, Copy)]
- pub struct MyInterceptor;
-
- impl Interceptor for MyInterceptor {
- fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
- let cx = Span::current().context();
- global::get_text_map_propagator(|propagator| {
- propagator.inject_context(&cx, &mut injector::MetadataMap(request.metadata_mut()))
- });
-
- Ok(request)
- }
- }
-}
diff --git a/crates/router/src/state.rs b/crates/router/src/state.rs
index 4ede2de..a0d9228 100644
--- a/crates/router/src/state.rs
+++ b/crates/router/src/state.rs
@@ -8,12 +8,10 @@ use tracing::error;
use warden_core::configuration::routing::{
RoutingConfiguration, query_routing_client::QueryRoutingClient,
};
+use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor};
use warden_stack::Configuration;
-use crate::{
- cnfg::LocalConfig,
- processor::grpc::interceptor::{Intercepted, MyInterceptor},
-};
+use crate::cnfg::LocalConfig;
#[derive(Clone)]
pub struct Services {
diff --git a/crates/rule-executor/Cargo.toml b/crates/rule-executor/Cargo.toml
new file mode 100644
index 0000000..3bb9561
--- /dev/null
+++ b/crates/rule-executor/Cargo.toml
@@ -0,0 +1,45 @@
+[package]
+name = "rule-executor"
+version = "0.1.0"
+edition = "2024"
+license.workspace = true
+homepage.workspace = true
+documentation.workspace = true
+description.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+async-nats.workspace = true
+clap = { workspace = true, features = ["derive"] }
+config = { workspace = true, features = ["convert-case", "toml"] }
+futures-util.workspace = true
+moka = { workspace = true, features = ["future"] }
+opentelemetry.workspace = true
+opentelemetry-semantic-conventions.workspace = true
+prost.workspace = true
+serde = { workspace = true, features = ["derive"] }
+serde_json.workspace = true
+time = { workspace = true, features = ["serde"] }
+tokio = { workspace = true, features = [
+ "macros",
+ "rt-multi-thread",
+ "signal",
+] }
+tonic.workspace = true
+tracing.workspace = true
+tracing-opentelemetry.workspace = true
+uuid = { workspace = true, features = ["v7"] }
+warden-core = { workspace = true, features = [
+ "message",
+ "configuration",
+ "serde",
+ "time",
+] }
+warden-stack = { workspace = true, features = [
+ "nats-jetstream",
+ "opentelemetry",
+ "postgres",
+ "opentelemetry-tonic",
+ "tracing-loki",
+] }
+warden-middleware.workspace = true
diff --git a/crates/rule-executor/rule-executor.toml b/crates/rule-executor/rule-executor.toml
new file mode 100644
index 0000000..dd64828
--- /dev/null
+++ b/crates/rule-executor/rule-executor.toml
@@ -0,0 +1,33 @@
+[application]
+env = "development"
+
+[monitoring]
+log-level = "warden_rule=trace,info"
+opentelemetry-endpoint = "http://localhost:4317"
+loki-endpoint = "http://localhost:3100"
+
+[misc]
+config-endpoint = "http://localhost:1304"
+
+[misc.nats]
+stream-name = "rules"
+subjects = ["rule.>"]
+durable-name = "rules"
+destination-prefix = "typology-rule"
+
+[misc.nats.config]
+stream = "configuration"
+reload-subject = "configuration.reload"
+
+[database]
+pool_size = 100
+port = 5432
+name = "pseudonyms"
+host = "localhost"
+password = "password"
+user = "postgres"
+
+[nats]
+hosts = ["nats://localhost:4222"]
+
+# vim:ft=toml
diff --git a/crates/rule-executor/src/cnfg.rs b/crates/rule-executor/src/cnfg.rs
new file mode 100644
index 0000000..eac1c2d
--- /dev/null
+++ b/crates/rule-executor/src/cnfg.rs
@@ -0,0 +1,26 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct LocalConfig {
+ pub config_endpoint: Arc<str>,
+ pub nats: Nats,
+}
+
+#[derive(Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub struct Nats {
+ pub name: Arc<str>,
+ pub subjects: Arc<[String]>,
+ pub durable_name: Arc<str>,
+ pub destination_prefix: Arc<str>,
+ pub config: ConfigNats,
+}
+
+#[derive(Deserialize, Clone)]
+pub struct ConfigNats {
+ pub stream: Arc<str>,
+ pub reload_subject: Arc<str>,
+}
diff --git a/crates/rule-executor/src/main.rs b/crates/rule-executor/src/main.rs
new file mode 100644
index 0000000..18c9222
--- /dev/null
+++ b/crates/rule-executor/src/main.rs
@@ -0,0 +1,60 @@
+mod cnfg;
+mod processor;
+mod state;
+
+use anyhow::Result;
+use clap::Parser;
+use tracing::error;
+use warden_stack::{Configuration, Services, tracing::Tracing};
+
+/// rule-executor
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// Path to config file
+ #[arg(short, long)]
+ config_file: Option<std::path::PathBuf>,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let args = Args::parse();
+ let config = include_str!("../rule-executor.toml");
+
+ let mut config = config::Config::builder()
+ .add_source(config::File::from_str(config, config::FileFormat::Toml));
+
+ if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) {
+ config = config.add_source(config::File::new(cf, config::FileFormat::Toml));
+ };
+
+ let mut config: Configuration = config.build()?.try_deserialize()?;
+ config.application.name = env!("CARGO_CRATE_NAME").into();
+ config.application.version = env!("CARGO_PKG_VERSION").into();
+
+ let tracing = Tracing::builder()
+ .opentelemetry(&config.application, &config.monitoring)?
+ .loki(&config.application, &config.monitoring)?
+ .build(&config.monitoring);
+
+ let provider = tracing.otel_provider;
+
+ tokio::spawn(tracing.loki_task);
+
+ let mut services = Services::builder()
+ .nats_jetstream(&config.nats)
+ .await
+ .inspect_err(|e| error!("nats: {e}"))?
+ .build();
+
+ let jetstream = services
+ .jetstream
+ .take()
+ .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?;
+
+ let services = state::Services { jetstream };
+
+ processor::serve(services, config, provider)
+ .await
+ .inspect_err(|e| error!("{e}"))
+}
diff --git a/crates/rule-executor/src/processor.rs b/crates/rule-executor/src/processor.rs
new file mode 100644
index 0000000..67d0d15
--- /dev/null
+++ b/crates/rule-executor/src/processor.rs
@@ -0,0 +1,114 @@
+mod publish;
+mod reload;
+mod rule;
+
+use std::sync::Arc;
+
+use anyhow::Result;
+use async_nats::jetstream::{
+ Context,
+ consumer::{Consumer, pull},
+};
+use futures_util::{future, StreamExt};
+use tokio::signal;
+use tracing::trace;
+use warden_stack::{Configuration, tracing::SdkTracerProvider};
+
+use crate::{
+ cnfg::Nats,
+ state::{AppHandle, AppState, Services},
+};
+
+pub async fn serve(
+ services: Services,
+ config: Configuration,
+ provider: SdkTracerProvider,
+) -> anyhow::Result<()> {
+ let state = Arc::new(AppState::new(services, config).await?);
+
+ tokio::select! {
+ _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {}
+ _ = shutdown_signal(provider) => {}
+ };
+
+ Ok(())
+}
+
+async fn run(state: AppHandle) -> anyhow::Result<()> {
+ let config = Arc::clone(&state);
+ let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;
+
+ let limit = None;
+
+ consumer
+ .messages()
+ .await?
+ .for_each_concurrent(limit, |message| {
+ let state = Arc::clone(&state);
+ // tokio::spawn(async move {
+ // if let Ok(message) = message
+ // && let Err(e) = route::route(message, Arc::clone(&state)).await
+ // {
+ // error!("{}", e.to_string());
+ // }
+ // });
+ future::ready(())
+ })
+ .await;
+
+ Ok(())
+}
+
+async fn get_or_create_stream(
+ jetstream: &Context,
+ nats: &Nats,
+) -> anyhow::Result<Consumer<pull::Config>> {
+ trace!(name = ?nats.name, "getting or creating stream");
+ let stream = jetstream
+ .get_or_create_stream(async_nats::jetstream::stream::Config {
+ name: format!("{}.v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")),
+ subjects: nats.subjects.iter().map(Into::into).collect(),
+ ..Default::default()
+ })
+ .await?;
+ let durable = nats.durable_name.to_string();
+ // Get or create a pull-based consumer
+ Ok(stream
+ .get_or_create_consumer(
+ durable.as_ref(),
+ async_nats::jetstream::consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ ..Default::default()
+ },
+ )
+ .await?)
+}
+
+async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
+ let ctrl_c = async {
+ signal::ctrl_c()
+ .await
+ .expect("failed to install Ctrl+C handler");
+ };
+
+ #[cfg(unix)]
+ let terminate = async {
+ signal::unix::signal(signal::unix::SignalKind::terminate())
+ .expect("failed to install signal handler")
+ .recv()
+ .await;
+ };
+
+ #[cfg(not(unix))]
+ let terminate = std::future::pending::<()>();
+
+ tokio::select! {
+ _ = ctrl_c => {
+ },
+ _ = terminate => {
+ },
+ }
+ let _ = provider.shutdown();
+
+ Ok(())
+}
diff --git a/crates/rule-executor/src/processor/publish.rs b/crates/rule-executor/src/processor/publish.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/crates/rule-executor/src/processor/publish.rs
@@ -0,0 +1 @@
+
diff --git a/crates/rule-executor/src/processor/reload.rs b/crates/rule-executor/src/processor/reload.rs
new file mode 100644
index 0000000..a111948
--- /dev/null
+++ b/crates/rule-executor/src/processor/reload.rs
@@ -0,0 +1,59 @@
+use async_nats::jetstream::consumer;
+use futures_util::StreamExt;
+use tracing::{debug, error, info};
+use uuid::Uuid;
+use warden_core::configuration::ReloadEvent;
+
+use crate::state::AppHandle;
+
+pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
+ let id = Uuid::now_v7().to_string();
+ info!(durable = id, "listening for configuration changes");
+
+ let durable = &id;
+ let consumer = state
+ .services
+ .jetstream
+ .get_stream(state.config.nats.config.stream.to_string())
+ .await?
+ .get_or_create_consumer(
+ durable,
+ consumer::pull::Config {
+ durable_name: Some(durable.to_string()),
+ filter_subject: state.config.nats.config.reload_subject.to_string(),
+ deliver_policy: consumer::DeliverPolicy::LastPerSubject,
+ ..Default::default()
+ },
+ )
+ .await?;
+
+ let mut messages = consumer.messages().await?;
+ while let Some(value) = messages.next().await {
+ match value {
+ Ok(message) => {
+ debug!("got reload cache event",);
+ if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec())
+ .map(|value| ReloadEvent::from_str_name(&value))
+ {
+ match event {
+ // TODO: find exact rule
+ ReloadEvent::Rule => {
+ let local_cache = state.local_cache.write().await;
+ local_cache.invalidate_all();
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ _ => {
+ debug!(event = ?event, "detected reload event, acknowledging");
+ let _ = message.ack().await.inspect_err(|e| error!("{e}"));
+ }
+ }
+ }
+ }
+ Err(e) => {
+ error!("{e}")
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/crates/rule-executor/src/processor/rule.rs b/crates/rule-executor/src/processor/rule.rs
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/crates/rule-executor/src/processor/rule.rs
@@ -0,0 +1 @@
+
diff --git a/crates/rule-executor/src/state.rs b/crates/rule-executor/src/state.rs
new file mode 100644
index 0000000..ec59519
--- /dev/null
+++ b/crates/rule-executor/src/state.rs
@@ -0,0 +1,54 @@
+use std::sync::Arc;
+
+use async_nats::jetstream::Context;
+use moka::future::Cache;
+use tokio::sync::RwLock;
+use tonic::transport::Endpoint;
+use tracing::error;
+use warden_core::configuration::{
+ routing::{RoutingConfiguration, query_routing_client::QueryRoutingClient},
+ rule::RuleConfigurationRequest,
+};
+use warden_stack::Configuration;
+
+use crate::cnfg::LocalConfig;
+use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor};
+
+#[derive(Clone)]
+pub struct Services {
+ pub jetstream: Context,
+}
+
+pub type AppHandle = Arc<AppState>;
+
+#[derive(Clone)]
+pub struct AppState {
+ pub services: Services,
+ pub local_cache: Arc<RwLock<Cache<RuleConfigurationRequest, RoutingConfiguration>>>,
+ pub config: LocalConfig,
+ pub query_routing_client: QueryRoutingClient<Intercepted>,
+}
+
+impl AppState {
+ pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> {
+ let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?;
+ let channel = Endpoint::new(config.config_endpoint.to_string())?
+ .connect()
+ .await
+ .inspect_err(|e| {
+ error!(
+ endpoint = ?config.config_endpoint,
+ "could not connect to config service: {e}",
+ )
+ })?;
+
+ let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor);
+
+ Ok(Self {
+ services,
+ config,
+ local_cache: Arc::new(RwLock::new(Cache::builder().build())),
+ query_routing_client,
+ })
+ }
+}
diff --git a/crates/warden/src/server.rs b/crates/warden/src/server.rs
index 2db9f0f..760138d 100644
--- a/crates/warden/src/server.rs
+++ b/crates/warden/src/server.rs
@@ -1,4 +1,3 @@
-pub mod grpc;
mod publish;
mod routes;
pub use routes::metrics::metrics_app;
diff --git a/crates/warden/src/state.rs b/crates/warden/src/state.rs
index 628225c..3ebb748 100644
--- a/crates/warden/src/state.rs
+++ b/crates/warden/src/state.rs
@@ -6,11 +6,8 @@ use tracing::error;
use warden_core::pseudonyms::transaction_relationship::mutate_pseudonym_client::MutatePseudonymClient;
use warden_stack::{Configuration, cache::RedisManager};
-use crate::{
- cnfg::LocalConfig,
- error::AppError,
- server::grpc::interceptor::{Intercepted, MyInterceptor},
-};
+use crate::{cnfg::LocalConfig, error::AppError};
+use warden_middleware::grpc::interceptor::{Intercepted, MyInterceptor};
#[derive(Clone)]
pub struct AppHandle(Arc<AppState>);
diff --git a/lib/warden-core/src/configuration/conv.rs b/lib/warden-core/src/configuration/conv.rs
index c5c7768..02f0d27 100644
--- a/lib/warden-core/src/configuration/conv.rs
+++ b/lib/warden-core/src/configuration/conv.rs
@@ -103,7 +103,7 @@ impl serde::Serialize for GenericParameter {
where
S: serde::Serializer,
{
- let json = serde_json::Value::from(self.0.clone());
+ let json = self.0.clone();
json.serialize(serializer)
}
}
diff --git a/lib/warden-middleware/Cargo.toml b/lib/warden-middleware/Cargo.toml
index 97c2c88..c68bc69 100644
--- a/lib/warden-middleware/Cargo.toml
+++ b/lib/warden-middleware/Cargo.toml
@@ -12,8 +12,10 @@ publish = false
axum.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
+tonic.workspace = true
tower-http = { workspace = true, features = [
"request-id",
"trace",
] }
tracing.workspace = true
+warden-stack = { workspace = true, features = ["opentelemetry-tonic"] }
diff --git a/crates/warden/src/server/grpc.rs b/lib/warden-middleware/src/grpc.rs
index f239ddb..f239ddb 100644
--- a/crates/warden/src/server/grpc.rs
+++ b/lib/warden-middleware/src/grpc.rs
diff --git a/lib/warden-middleware/src/lib.rs b/lib/warden-middleware/src/lib.rs
index 6e3a0f4..2fb8df8 100644
--- a/lib/warden-middleware/src/lib.rs
+++ b/lib/warden-middleware/src/lib.rs
@@ -1,3 +1,4 @@
+pub mod grpc;
mod metrics;
mod trace_layer;