1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
use opentelemetry_semantic_conventions::attribute;
use prost::Message;
use tonic::{Request, Status};
use tracing::{Instrument, debug, info_span, instrument, warn};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use warden_stack::redis::AsyncCommands;
use uuid::Uuid;
use warden_core::{
configuration::routing::{
GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting,
},
google,
};
use crate::state::{AppHandle, cache_key::CacheKey};
pub struct RoutingRow {
id: Uuid,
configuration: sqlx::types::Json<RoutingConfiguration>,
}
#[tonic::async_trait]
impl QueryRouting for AppHandle {
#[instrument(skip(self, _request), Err(Debug))]
async fn get_active_routing_configuration(
&self,
_request: Request<google::protobuf::Empty>,
) -> Result<tonic::Response<GetActiveRoutingResponse>, Status> {
let mut cache = self
.services
.cache
.get()
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
let span = info_span!("cache.get");
span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
span.set_attribute(attribute::DB_OPERATION_NAME, "get");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
let routing_config = cache
.get::<_, Vec<u8>>(CacheKey::ActiveRouting)
.instrument(span)
.await
.map(|value| {
if !value.is_empty() {
RoutingConfiguration::decode(value.as_ref()).ok()
} else {
None
}
});
if let Ok(Some(routing_config)) = routing_config {
if routing_config.active {
return Ok(tonic::Response::new(GetActiveRoutingResponse {
configuration: Some(routing_config),
}));
}
}
let span = info_span!("db.get.routing.active");
span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
span.set_attribute(attribute::DB_OPERATION_NAME, "select");
span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
let config = sqlx::query_as!(
RoutingRow,
r#"select id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>" from routing where
configuration->>'active' = 'true'"#,
)
.fetch_optional(&self.services.postgres)
.instrument(span)
.await.map_err(|e| tonic::Status::internal(e.to_string()))?;
let config = config.map(|transaction| {
debug!(id = ?transaction.id, "found active config");
transaction.configuration.0
});
match config {
Some(config) => {
let bytes = config.encode_to_vec();
let span = info_span!("cache.set");
span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
span.set_attribute(attribute::DB_OPERATION_NAME, "set");
span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active");
if let Err(e) = cache
.set::<_, _, ()>(CacheKey::ActiveRouting, bytes)
.instrument(span)
.await
{
warn!("{e}");
};
Ok(tonic::Response::new(GetActiveRoutingResponse {
configuration: Some(config),
}))
}
None => Ok(tonic::Response::new(GetActiveRoutingResponse {
configuration: None,
})),
}
}
}
|