aboutsummaryrefslogtreecommitdiffstats
path: root/crates/router/src/processor/reload.rs
blob: c75465c2c3a8cc8dc50d65f28982a0d6e8300f68 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use futures_util::StreamExt;
use async_nats::jetstream::consumer;
use tracing::{trace, debug, error, info};
use uuid::Uuid;
use warden_core::configuration::ReloadEvent;

use crate::state::AppHandle;

pub async fn reload(state: AppHandle) -> anyhow::Result<()> {
    let id = Uuid::now_v7().to_string();
    info!(durable = id, "listening for configuration changes");

    let durable = &id;
    let consumer = state
        .services
        .jetstream
        .get_stream(state.config.nats.config.stream.to_string())
        .await?
        .get_or_create_consumer(
            durable,
            consumer::pull::Config {
                durable_name: Some(durable.to_string()),
                filter_subject: state.config.nats.config.reload_subject.to_string(),
                deliver_policy: consumer::DeliverPolicy::LastPerSubject,
                ..Default::default()
            },
        )
        .await?;

    let mut messages = consumer.messages().await?;
    while let Some(value) = messages.next().await {
        match value {
            Ok(message) => {
                trace!("got reload cache event",);
                if let Ok(Some(event)) = String::from_utf8(message.payload.to_vec())
                    .map(|value| ReloadEvent::from_str_name(&value))
                {
                    match event {
                        ReloadEvent::Routing => {
                            let local_cache = state.local_cache.write().await;
                            local_cache.invalidate_all();
                            let _ = message.ack().await.inspect_err(|e| error!("{e}"));
                        }
                        _ => {
                            debug!(event = ?event, "detected reload event, acknowledging");
                            let _ = message.ack().await.inspect_err(|e| error!("{e}"));
                        }
                    }
                }
            }
            Err(e) => {
                error!("{e}")
            }
        }
    }

    Ok(())
}