blob: 9d3fd0dbbf3f73f96ea2685aa5d58c78ba66d932 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
use opentelemetry_semantic_conventions::attribute;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing::{Instrument, debug, info, info_span, instrument, warn};
use warden_core::{configuration::routing::RoutingConfiguration, google };
use crate::{cnfg::CACHE_KEY, state::AppHandle};
#[instrument(skip(state))]
pub async fn get_routing_config(state: AppHandle) -> Option<RoutingConfiguration> {
debug!("getting routing config");
{
let span = info_span!("local_cache.get");
span.set_attribute(attribute::DB_SYSTEM_NAME, "moka");
let local_cache = state.local_cache.read().await;
if let Some(value) = local_cache.get(&CACHE_KEY).await {
return Some(value);
}
}
let mut client = state.query_routing_client.clone();
let span = info_span!("get.routing.config");
span.set_attribute(attribute::RPC_SERVICE, env!("CARGO_PKG_NAME"));
span.set_attribute("otel.kind", "client");
if let Ok(config) = client
.get_active_routing_configuration(google::protobuf::Empty::default())
.instrument(span)
.await
{
debug!("fetched routing config");
let span = info_span!("local_cache.insert");
span.set_attribute(attribute::DB_SYSTEM_NAME, "moka");
if let Some(config) = config.into_inner().configuration {
debug!("updating cache");
let local_cache = state.local_cache.write().await;
local_cache
.insert(CACHE_KEY, config.clone())
.instrument(span)
.await;
info!("cache refreshed");
return Some(config);
} else {
warn!("no routing config is active");
return None;
}
} else {
warn!("no routing config is active");
return None;
}
}
|