aboutsummaryrefslogtreecommitdiffstats
path: root/crates/configuration/src/state/rule/query_rule.rs
blob: 145de5cc06901c631bb144ebaac304297ff8d242 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use opentelemetry_semantic_conventions::attribute;
use prost::Message;
use tonic::{Request, Response, Status, async_trait};
use tracing::{Instrument, debug, info_span, instrument, warn};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use warden_core::configuration::rule::{
    GetRuleConfigResponse, RuleConfiguration, RuleConfigurationRequest,
    query_rule_configuration_server::QueryRuleConfiguration,
};
use warden_stack::redis::AsyncCommands;

use crate::state::{AppHandle, cache_key::CacheKey, rule::RuleRow};

#[async_trait]
impl QueryRuleConfiguration for AppHandle {
    #[instrument(skip(self, request), Err(Debug))]
    async fn get_rule_configuration(
        &self,
        request: Request<RuleConfigurationRequest>,
    ) -> Result<Response<GetRuleConfigResponse>, Status> {
        let data = request.into_inner();
        let mut cache = self
            .services
            .cache
            .get()
            .await
            .map_err(|e| tonic::Status::internal(e.to_string()))?;

        let key = CacheKey::from(&data);

        let span = info_span!("cache.get.rule");
        span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
        span.set_attribute(attribute::DB_OPERATION_NAME, "get");
        span.set_attribute("otel.kind", "client");

        let configuration = cache
            .get::<_, Vec<u8>>(&key)
            .instrument(span)
            .await
            .map(|value| {
                if !value.is_empty() {
                    RuleConfiguration::decode(value.as_ref()).ok()
                } else {
                    None
                }
            });

        if let Ok(Some(rule_config)) = configuration {
            return Ok(tonic::Response::new(GetRuleConfigResponse {
                configuration: Some(rule_config),
            }));
        }

        let span = info_span!("get.rule");
        span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres");
        span.set_attribute(attribute::DB_OPERATION_NAME, "select");
        span.set_attribute(attribute::DB_COLLECTION_NAME, "rule");
        span.set_attribute("otel.kind", "client");

        let config = sqlx::query_as!(
            RuleRow,
            r#"select uuid, version, id, configuration as "configuration: sqlx::types::Json<RuleConfiguration>" from rule where
            id = $1 and version = $2"#,
            data.id,
            data.version,
        )
        .fetch_optional(&self.services.postgres)
        .instrument(span)
        .await.map_err(|e| tonic::Status::internal(e.to_string()))?;

        let config = config.map(|transaction| {
            debug!(id = ?transaction.id, "found config");
            transaction.configuration.0
        });

        match config {
            Some(config) => {
                let bytes = config.encode_to_vec();
                let span = info_span!("cache.set");
                span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey");
                span.set_attribute(attribute::DB_OPERATION_NAME, "set");
                span.set_attribute("otel.kind", "client");

                if let Err(e) = cache.set::<_, _, ()>(&key, bytes).instrument(span).await {
                    warn!("{e}");
                };

                Ok(tonic::Response::new(GetRuleConfigResponse {
                    configuration: Some(config),
                }))
            }
            None => Ok(tonic::Response::new(GetRuleConfigResponse {
                configuration: None,
            })),
        }
    }
}