aboutsummaryrefslogtreecommitdiffstats
path: root/crates/aggregator/src/processor.rs
blob: 3a7c8ac3ec9650171dbd6d590387a2a2fadaedfb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
mod aggregate;

use anyhow::Result;
use async_nats::{
    self,
    jetstream::{
        Context,
        consumer::{Consumer, pull::Config},
    },
};
use futures_util::{StreamExt as _, future};
use tokio::signal;
use tracing::{debug, error, info};
use warden_stack::tracing::SdkTracerProvider;

use crate::{cnfg::NatsConfig, state::AppHandle};

pub async fn serve(state: AppHandle, provider: SdkTracerProvider) -> Result<()> {
    tokio::select! {
        _ = run(state) => {}
        _ = shutdown_signal(provider) => {}
    };
    Ok(())
}

async fn run(state: AppHandle) -> anyhow::Result<()> {
    let consumer = get_or_create_stream(&state.services.jetstream, &state.config.nats).await?;

    let limit = None;

    consumer
        .messages()
        .await?
        .for_each_concurrent(limit, |message| {
            let state = state.clone();
            tokio::spawn(async move {
                if let Ok(message) = message
                    && let Err(e) = aggregate::handle(message, state).await
                {
                    error!("{}", e.to_string());
                }
            });
            future::ready(())
        })
        .await;

    Ok(())
}

async fn get_or_create_stream(
    jetstream: &Context,
    nats: &NatsConfig,
) -> anyhow::Result<Consumer<Config>> {
    debug!(name = ?nats.name, subjects = ?nats.subjects, "getting or creating stream");
    let stream = jetstream
        .get_or_create_stream(async_nats::jetstream::stream::Config {
            name: nats.name.to_string(),
            subjects: nats.subjects.iter().map(|v| v.to_string()).collect(),
            ..Default::default()
        })
        .await?;
    let durable = nats.durable_name.to_string();
    // Get or create a pull-based consumer
    let consumer = stream
        .get_or_create_consumer(
            durable.as_ref(),
            async_nats::jetstream::consumer::pull::Config {
                durable_name: Some(durable.to_string()),
                ..Default::default()
            },
        )
        .await?;

    info!(subject = ?nats.subjects, "ready to receive messages");
    Ok(consumer)
}

async fn shutdown_signal(provider: SdkTracerProvider) -> Result<()> {
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {
        },
        _ = terminate => {
        },
    }
    let _ = provider.shutdown();

    Ok(())
}