aboutsummaryrefslogtreecommitdiffstats
path: root/crates/router/src/processor.rs
blob: b8c69f300566a88f1aaac09695b6b39ffebb835f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
pub mod grpc;
mod reload;
mod route;
mod publish;
mod load;

use std::sync::Arc;

use anyhow::Result;
use async_nats::jetstream::{consumer::{pull, Consumer}, Context};
use tokio::signal;
use tracing::{error, trace};
use warden_stack::{Configuration, tracing::SdkTracerProvider};
use futures_util::StreamExt;

use crate::{cnfg::Nats, state::{AppHandle, AppState, Services}};


pub async fn serve(
    services: Services,
    config: Configuration,
    provider: SdkTracerProvider,
) -> anyhow::Result<()> {
    let state = Arc::new(AppState::new(services, config).await?);

    tokio::select! {
        _ = futures_util::future::try_join(reload::reload(Arc::clone(&state)), run(state)) => {}
        _ = shutdown_signal(provider) => {}
    };

    Ok(())
}


async fn run(state: AppHandle) -> anyhow::Result<()> {
    let config = Arc::clone(&state);
    let (consumer, _) = tokio::join!(
        get_or_create_stream(&state.services.jetstream, &state.config.nats),
        load::get_routing_config(Arc::clone(&config))
    );

    let consumer = consumer?;

    // Consume messages from the consumer
    while let Some(Ok(message)) = consumer.messages().await?.next().await {
        if let Err(e) = route::route(message, Arc::clone(&state)).await {
            error!("{}", e.to_string());
        }
    }

    Ok(())
}

async fn get_or_create_stream(
    jetstream: &Context,
    nats: &Nats,
) -> anyhow::Result<Consumer<pull::Config>> {
    trace!(name = ?nats.name, "getting or creating stream");
    let stream = jetstream
        .get_or_create_stream(async_nats::jetstream::stream::Config {
            name: nats.name.to_string(),
            subjects: nats.subjects.iter().map(Into::into).collect(),
            max_messages: nats.max_messages,
            ..Default::default()
        })
        .await?;
    let durable = nats.durable_name.to_string();
    // Get or create a pull-based consumer
    Ok(stream
        .get_or_create_consumer(
            durable.as_ref(),
            async_nats::jetstream::consumer::pull::Config {
                durable_name: Some(durable.to_string()),
                ..Default::default()
            },
        )
        .await?)
}


async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {
        },
        _ = terminate => {
        },
    }
    let _ = provider.shutdown();

    Ok(())
}