diff options
Diffstat (limited to 'crates/users-service/src')
-rw-r--r-- | crates/users-service/src/cnfg.rs | 8 | ||||
-rw-r--r-- | crates/users-service/src/main.rs | 109 | ||||
-rw-r--r-- | crates/users-service/src/server.rs | 2 | ||||
-rw-r--r-- | crates/users-service/src/server/interceptor.rs | 11 | ||||
-rw-r--r-- | crates/users-service/src/server/manager.rs | 85 | ||||
-rw-r--r-- | crates/users-service/src/state.rs | 65 |
6 files changed, 280 insertions, 0 deletions
diff --git a/crates/users-service/src/cnfg.rs b/crates/users-service/src/cnfg.rs new file mode 100644 index 0000000..fec4cf7 --- /dev/null +++ b/crates/users-service/src/cnfg.rs @@ -0,0 +1,8 @@ +use serde::Deserialize; + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct LocalConfig { + pub temp_ttl: u64, + pub cache_ttl: u64, +} diff --git a/crates/users-service/src/main.rs b/crates/users-service/src/main.rs new file mode 100644 index 0000000..218c74a --- /dev/null +++ b/crates/users-service/src/main.rs @@ -0,0 +1,109 @@ +mod cnfg; +mod server; +mod state; +use std::net::{Ipv6Addr, SocketAddr}; + +use clap::Parser; +use sellershut_core::users::users_service_server::UsersServiceServer; +use stack_up::{Configuration, Services, tracing::Tracing}; +use tokio::signal; +use tonic::transport::{Server, server::TcpIncoming}; +use tracing::{error, info}; + +use crate::{ + server::interceptor::MyInterceptor, + state::{AppHandle, AppState}, +}; + +/// sellershut-profiles +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + let config = include_str!("../users.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder().build(&config.monitoring); + + let mut services = Services::builder() + .postgres(&config.database) + .await + .inspect_err(|e| error!("database: {e}"))? + .cache(&config.cache) + .await + .inspect_err(|e| error!("cache: {e}"))? + .build(); + + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let services = crate::state::Services::new(postgres, cache); + + let state = AppState::create(services, &config).await?; + + let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port)); + + let listener = tokio::net::TcpListener::bind(addr).await?; + + info!(addr = ?addr, "starting server"); + + Server::builder() + .trace_fn(|_| tracing::info_span!(env!("CARGO_PKG_NAME"))) + .add_service(UsersServiceServer::with_interceptor( + state.clone(), + MyInterceptor, + )) + .serve_with_incoming_shutdown(TcpIncoming::from(listener), shutdown_signal(state)) + .await?; + + Ok(()) +} +async fn shutdown_signal(state: AppHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } +} diff --git a/crates/users-service/src/server.rs b/crates/users-service/src/server.rs new file mode 100644 index 0000000..b2e04f9 --- /dev/null +++ b/crates/users-service/src/server.rs @@ -0,0 +1,2 @@ +pub mod interceptor; +pub mod manager; diff --git a/crates/users-service/src/server/interceptor.rs b/crates/users-service/src/server/interceptor.rs new file mode 100644 index 0000000..6fbe7fa --- /dev/null +++ b/crates/users-service/src/server/interceptor.rs @@ -0,0 +1,11 @@ +use tonic::{Status, service::Interceptor}; +use tracing::Span; + +#[derive(Clone, Copy)] +pub struct MyInterceptor; + +impl Interceptor for MyInterceptor { + fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + Ok(request) + } +} diff --git a/crates/users-service/src/server/manager.rs b/crates/users-service/src/server/manager.rs new file mode 100644 index 0000000..6affb4a --- /dev/null +++ b/crates/users-service/src/server/manager.rs @@ -0,0 +1,85 @@ +use prost::Message; +use sellershut_core::users::{ + CompleteUserRequest, CreateUserRequest, CreateUserResponse, User, + users_service_server::UsersService, +}; +use stack_up::redis::AsyncCommands; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{error, trace}; +use uuid::Uuid; + +use crate::state::AppHandle; + +#[async_trait] +impl UsersService for AppHandle { + #[doc = " Create a new user profile"] + async fn create_user( + &self, + request: Request<CreateUserRequest>, + ) -> Result<Response<CreateUserResponse>, Status> { + trace!("creating user"); + let data = request.into_inner(); + let id = Uuid::now_v7().to_string(); + + let bytes = data.encode_to_vec(); + let mut cache = self.cache().await?; + cache + .set_ex::<_, _, ()>(&id, &bytes, self.local_config.temp_ttl) + .await + .inspect_err(|e| error!("{e}")) + .map_err(|_e| Status::internal("storage not ready"))?; + + Ok(Response::new(CreateUserResponse { temp_id: id })) + } + + #[doc = " Complete Profile"] + async fn complete_user( + &self, + request: Request<CompleteUserRequest>, + ) -> Result<Response<User>, Status> { + let request = request.into_inner(); + + let mut cache = self.cache().await?; + + let resp = cache + .get_del::<_, Vec<u8>>(&request.id) + .await + .inspect_err(|e| error!("{e}")) + .map_err(|_e| Status::internal("storage not ready"))?; + + if resp.is_empty() { + return Err(Status::data_loss("user unavailable")); + } + + let create_user = CreateUserRequest::decode(resp.as_ref()) + .map_err(|_e| Status::data_loss("internal data corrupted"))?; + + let user = sqlx::query!( + "insert + into + profile ( + id, + username, + inbox, + outbox, + local, + avatar_url, + description, + user_type, + public_key + ) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ", + request.id, + request.username, + request.inbox, + request.outbox, + request.local, + request.avatar.or(create_user.avatar), + request.description, + request.user_type.to_string(), + request.public_key, + ); + todo!() + } +} diff --git a/crates/users-service/src/state.rs b/crates/users-service/src/state.rs new file mode 100644 index 0000000..3f5ac7b --- /dev/null +++ b/crates/users-service/src/state.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use sqlx::PgPool; +use stack_up::{ + Configuration, + cache::{RedisConnection, RedisManager}, +}; +use tonic::Status; +use tracing::error; + +use crate::cnfg::LocalConfig; + +#[derive(Clone)] +pub struct AppHandle(Arc<AppState>); + +impl std::ops::Deref for AppHandle { + type Target = Arc<AppState>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Clone)] +pub struct Services { + pub postgres: PgPool, + pub cache: RedisManager, +} + +impl Services { + pub fn new(postgres: PgPool, cache: RedisManager) -> Self { + Self { postgres, cache } + } +} + +pub struct AppState { + pub services: Services, + pub local_config: LocalConfig, +} + +impl AppState { + pub async fn create( + services: Services, + configuration: &Configuration, + ) -> Result<AppHandle, anyhow::Error> { + let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + + Ok(AppHandle(Arc::new(Self { + services, + local_config, + }))) + } + + pub async fn cache(&self) -> Result<RedisConnection, tonic::Status> { + let cache = self + .services + .cache + .get() + .await + .inspect_err(|e| error!("{e}")) + .map_err(|_e| Status::internal("storage not ready"))?; + + Ok(cache) + } +} |