aboutsummaryrefslogtreecommitdiffstats
path: root/crates/configuration/src/state/routing/query_routing.rs
blob: 82c8d1c57c93ce5c135a9d7d0af9a805b42243c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use opentelemetry_semantic_conventions::attribute;
use prost::Message;
use tonic::{Request, Status};
use tracing::{Instrument, debug, info_span, instrument, warn};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use warden_stack::redis::AsyncCommands;

use uuid::Uuid;
use warden_core::{
    configuration::routing::{
        GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting,
    },
    google,
};

use crate::state::{AppHandle, cache_key::CacheKey};

pub struct RoutingRow {
    id: Uuid,
    configuration: sqlx::types::Json<RoutingConfiguration>,
}

#[tonic::async_trait]
impl QueryRouting for AppHandle {
    #[instrument(skip(self, _request), Err(Debug))]
    async fn get_active_routing_configuration(
        &self,
        _request: Request<google::protobuf::Empty>,
    ) -> Result<tonic::Response<GetActiveRoutingResponse>, Status> {
        let mut cache = self
            .services
            .cache
            .get()
            .await
            .map_err(|e| tonic::Status::internal(e.to_string()))?;

        let span = info_span!("cache.get");
        span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
        span.set_attribute(attribute::DB_OPERATION_NAME, "get");
        span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
        span.set_attribute("otel.kind", "client");

        let routing_config = cache
            .get::<_, Vec<u8>>(CacheKey::ActiveRouting)
            .instrument(span)
            .await
            .map(|value| {
                if !value.is_empty() {
                    RoutingConfiguration::decode(value.as_ref()).ok()
                } else {
                    None
                }
            });

        if let Ok(Some(routing_config)) = routing_config
            && routing_config.active
        {
            return Ok(tonic::Response::new(GetActiveRoutingResponse {
                configuration: Some(routing_config),
            }));
        }

        let span = info_span!("db.get.routing.active");
        span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
        span.set_attribute(attribute::DB_OPERATION_NAME, "select");
        span.set_attribute(attribute::DB_COLLECTION_NAME, "routing");
        span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active");
        span.set_attribute("otel.kind", "client");

        let config = sqlx::query_as!(
            RoutingRow,
            r#"select id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>" from routing where
            configuration->>'active' = 'true'"#,
        )
        .fetch_optional(&self.services.postgres)
        .instrument(span)
        .await.map_err(|e| tonic::Status::internal(e.to_string()))?;

        let config = config.map(|transaction| {
            debug!(id = ?transaction.id, "found active config");
            transaction.configuration.0
        });

        match config {
            Some(config) => {
                let bytes = config.encode_to_vec();
                let span = info_span!("cache.set");
                span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
                span.set_attribute(attribute::DB_OPERATION_NAME, "set");
                span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active");
                span.set_attribute("otel.kind", "client");

                if let Err(e) = cache
                    .set::<_, _, ()>(CacheKey::ActiveRouting, bytes)
                    .instrument(span)
                    .await
                {
                    warn!("{e}");
                };

                Ok(tonic::Response::new(GetActiveRoutingResponse {
                    configuration: Some(config),
                }))
            }
            None => Ok(tonic::Response::new(GetActiveRoutingResponse {
                configuration: None,
            })),
        }
    }
}