aboutsummaryrefslogtreecommitdiffstats
path: root/lib/warden-stack/src/nats.rs
blob: d1811f02d967f03fb5354d7eb3b112bd5cc3c5f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::sync::Arc;

use serde::Deserialize;

#[derive(Deserialize, Clone, Debug, Default)]
/// Nats configuration
pub struct NatsConfig {
    /// Hosts dsn
    #[serde(default = "nats")]
    pub hosts: Arc<[String]>,
}

pub(crate) fn nats() -> Arc<[String]> {
    let hosts = vec!["nats://localhost:4222".to_string()];
    hosts.into()
}

impl NatsConfig {
    fn hosts(&self) -> Vec<String> {
        self.hosts.iter().map(ToString::to_string).collect()
    }
}

use crate::{
    ServiceError, ServicesBuilder,
    services_builder::{IsUnset, State},
};

#[cfg(feature = "nats-jetstream")]
impl<S: State> ServicesBuilder<S> {
    /// create a Jetstream Context using the provided [NatsConfig]
    pub async fn nats_jetstream(
        self,
        config: &NatsConfig,
    ) -> Result<ServicesBuilder<crate::services_builder::SetJetstream<S>>, ServiceError>
    where
        S::Jetstream: IsUnset,
    {
        let hosts = config.hosts();
        let client = async_nats::connect(hosts).await?;

        Ok(self.jetstream_internal(async_nats::jetstream::new(client)))
    }
}

#[cfg(feature = "nats-core")]
impl<S: State> ServicesBuilder<S> {
    /// create a NATS connection using the provided [NatsConfig]
    pub async fn nats(
        self,
        config: &NatsConfig,
    ) -> Result<ServicesBuilder<crate::services_builder::SetNats<S>>, ServiceError>
    where
        S::Nats: IsUnset,
    {
        let hosts = config.hosts();
        let client = async_nats::connect(hosts).await?;

        Ok(self.nats_internal(client))
    }
}