aboutsummaryrefslogtreecommitdiffstats
path: root/crates/router/src/state.rs
blob: 4ede2de1c7158c3818cf13c2a72c12a315160273 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use std::sync::Arc;

use async_nats::jetstream::Context;
use moka::future::Cache;
use tokio::sync::RwLock;
use tonic::transport::Endpoint;
use tracing::error;
use warden_core::configuration::routing::{
    RoutingConfiguration, query_routing_client::QueryRoutingClient,
};
use warden_stack::Configuration;

use crate::{
    cnfg::LocalConfig,
    processor::grpc::interceptor::{Intercepted, MyInterceptor},
};

#[derive(Clone)]
pub struct Services {
    pub jetstream: Context,
}

pub type AppHandle = Arc<AppState>;

#[derive(Clone)]
pub struct AppState {
    pub services: Services,
    pub local_cache: Arc<RwLock<Cache<i32, RoutingConfiguration>>>,
    pub config: LocalConfig,
    pub query_routing_client: QueryRoutingClient<Intercepted>,
}

impl AppState {
    pub async fn new(services: Services, configuration: Configuration) -> anyhow::Result<Self> {
        let config: LocalConfig = serde_json::from_value(configuration.misc.clone())?;
        let channel = Endpoint::new(config.config_endpoint.to_string())?
            .connect()
            .await
            .inspect_err(|e| {
                error!(
                    endpoint = ?config.config_endpoint,
                    "could not connect to config service: {e}",
                )
            })?;

        let query_routing_client = QueryRoutingClient::with_interceptor(channel, MyInterceptor);

        Ok(Self {
            services,
            config,
            local_cache: Arc::new(RwLock::new(Cache::builder().build())),
            query_routing_client,
        })
    }
}