diff options
Diffstat (limited to 'crates/users-service')
-rw-r--r-- | crates/users-service/Cargo.toml | 32 | ||||
-rw-r--r-- | crates/users-service/migrations/20250726161947_profile.sql | 32 | ||||
-rw-r--r-- | crates/users-service/src/cnfg.rs | 8 | ||||
-rw-r--r-- | crates/users-service/src/main.rs | 109 | ||||
-rw-r--r-- | crates/users-service/src/server.rs | 2 | ||||
-rw-r--r-- | crates/users-service/src/server/interceptor.rs | 11 | ||||
-rw-r--r-- | crates/users-service/src/server/manager.rs | 85 | ||||
-rw-r--r-- | crates/users-service/src/state.rs | 65 | ||||
-rw-r--r-- | crates/users-service/users.toml | 37 |
9 files changed, 381 insertions, 0 deletions
diff --git a/crates/users-service/Cargo.toml b/crates/users-service/Cargo.toml new file mode 100644 index 0000000..2bbfe28 --- /dev/null +++ b/crates/users-service/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "sellershut-users" +version = "0.1.0" +edition = "2024" +license.workspace = true +homepage.workspace = true +documentation.workspace = true +description.workspace = true + +[dependencies] +anyhow.workspace = true +base64.workspace = true +clap = { workspace = true, features = ["derive"] } +config = { workspace = true, features = ["convert-case", "toml"] } +futures-util.workspace = true +nanoid.workspace = true +prost.workspace = true +sellershut-core = { workspace = true, features = ["users", "serde"] } +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +sqlx = { workspace = true, features = ["macros", "migrate", "runtime-tokio", "time", "tls-rustls", "uuid"] } +time = { workspace = true, features = ["parsing", "serde"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } +tonic.workspace = true +tonic-reflection = "0.13.0" +tracing.workspace = true +url.workspace = true +uuid = { workspace = true, features = ["serde", "v7"] } + +[dependencies.stack-up] +workspace = true +features = ["api", "cache", "postgres", "tracing"] diff --git a/crates/users-service/migrations/20250726161947_profile.sql b/crates/users-service/migrations/20250726161947_profile.sql new file mode 100644 index 0000000..15822c8 --- /dev/null +++ b/crates/users-service/migrations/20250726161947_profile.sql @@ -0,0 +1,32 @@ +create table profile ( + id text primary key, + username varchar(30) not null, + inbox text not null, + outbox text, + local boolean not null, + avatar_url text, + description text, + user_type text not null check ( + user_type IN ('PERSON', 'APPLICATION', 'GROUP', 'ORGANIZATION', 'SERVICE') + ), + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + public_key text not null +); + +create unique index unique_username_local + on profile (username) + where local = true; + +create or replace function set_updated_at() +returns trigger as $$ +begin + new.updated_at := now(); + return new; +end; +$$ language plpgsql; + +create trigger trigger_set_updated_at +before update on profile +for each row +execute function set_updated_at(); diff --git a/crates/users-service/src/cnfg.rs b/crates/users-service/src/cnfg.rs new file mode 100644 index 0000000..fec4cf7 --- /dev/null +++ b/crates/users-service/src/cnfg.rs @@ -0,0 +1,8 @@ +use serde::Deserialize; + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct LocalConfig { + pub temp_ttl: u64, + pub cache_ttl: u64, +} diff --git a/crates/users-service/src/main.rs b/crates/users-service/src/main.rs new file mode 100644 index 0000000..218c74a --- /dev/null +++ b/crates/users-service/src/main.rs @@ -0,0 +1,109 @@ +mod cnfg; +mod server; +mod state; +use std::net::{Ipv6Addr, SocketAddr}; + +use clap::Parser; +use sellershut_core::users::users_service_server::UsersServiceServer; +use stack_up::{Configuration, Services, tracing::Tracing}; +use tokio::signal; +use tonic::transport::{Server, server::TcpIncoming}; +use tracing::{error, info}; + +use crate::{ + server::interceptor::MyInterceptor, + state::{AppHandle, AppState}, +}; + +/// sellershut-profiles +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + let config = include_str!("../users.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder().build(&config.monitoring); + + let mut services = Services::builder() + .postgres(&config.database) + .await + .inspect_err(|e| error!("database: {e}"))? + .cache(&config.cache) + .await + .inspect_err(|e| error!("cache: {e}"))? + .build(); + + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let services = crate::state::Services::new(postgres, cache); + + let state = AppState::create(services, &config).await?; + + let addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, config.application.port)); + + let listener = tokio::net::TcpListener::bind(addr).await?; + + info!(addr = ?addr, "starting server"); + + Server::builder() + .trace_fn(|_| tracing::info_span!(env!("CARGO_PKG_NAME"))) + .add_service(UsersServiceServer::with_interceptor( + state.clone(), + MyInterceptor, + )) + .serve_with_incoming_shutdown(TcpIncoming::from(listener), shutdown_signal(state)) + .await?; + + Ok(()) +} +async fn shutdown_signal(state: AppHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + }, + _ = terminate => { + }, + } +} diff --git a/crates/users-service/src/server.rs b/crates/users-service/src/server.rs new file mode 100644 index 0000000..b2e04f9 --- /dev/null +++ b/crates/users-service/src/server.rs @@ -0,0 +1,2 @@ +pub mod interceptor; +pub mod manager; diff --git a/crates/users-service/src/server/interceptor.rs b/crates/users-service/src/server/interceptor.rs new file mode 100644 index 0000000..6fbe7fa --- /dev/null +++ b/crates/users-service/src/server/interceptor.rs @@ -0,0 +1,11 @@ +use tonic::{Status, service::Interceptor}; +use tracing::Span; + +#[derive(Clone, Copy)] +pub struct MyInterceptor; + +impl Interceptor for MyInterceptor { + fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + Ok(request) + } +} diff --git a/crates/users-service/src/server/manager.rs b/crates/users-service/src/server/manager.rs new file mode 100644 index 0000000..6affb4a --- /dev/null +++ b/crates/users-service/src/server/manager.rs @@ -0,0 +1,85 @@ +use prost::Message; +use sellershut_core::users::{ + CompleteUserRequest, CreateUserRequest, CreateUserResponse, User, + users_service_server::UsersService, +}; +use stack_up::redis::AsyncCommands; +use tonic::{Request, Response, Status, async_trait}; +use tracing::{error, trace}; +use uuid::Uuid; + +use crate::state::AppHandle; + +#[async_trait] +impl UsersService for AppHandle { + #[doc = " Create a new user profile"] + async fn create_user( + &self, + request: Request<CreateUserRequest>, + ) -> Result<Response<CreateUserResponse>, Status> { + trace!("creating user"); + let data = request.into_inner(); + let id = Uuid::now_v7().to_string(); + + let bytes = data.encode_to_vec(); + let mut cache = self.cache().await?; + cache + .set_ex::<_, _, ()>(&id, &bytes, self.local_config.temp_ttl) + .await + .inspect_err(|e| error!("{e}")) + .map_err(|_e| Status::internal("storage not ready"))?; + + Ok(Response::new(CreateUserResponse { temp_id: id })) + } + + #[doc = " Complete Profile"] + async fn complete_user( + &self, + request: Request<CompleteUserRequest>, + ) -> Result<Response<User>, Status> { + let request = request.into_inner(); + + let mut cache = self.cache().await?; + + let resp = cache + .get_del::<_, Vec<u8>>(&request.id) + .await + .inspect_err(|e| error!("{e}")) + .map_err(|_e| Status::internal("storage not ready"))?; + + if resp.is_empty() { + return Err(Status::data_loss("user unavailable")); + } + + let create_user = CreateUserRequest::decode(resp.as_ref()) + .map_err(|_e| Status::data_loss("internal data corrupted"))?; + + let user = sqlx::query!( + "insert + into + profile ( + id, + username, + inbox, + outbox, + local, + avatar_url, + description, + user_type, + public_key + ) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ", + request.id, + request.username, + request.inbox, + request.outbox, + request.local, + request.avatar.or(create_user.avatar), + request.description, + request.user_type.to_string(), + request.public_key, + ); + todo!() + } +} diff --git a/crates/users-service/src/state.rs b/crates/users-service/src/state.rs new file mode 100644 index 0000000..3f5ac7b --- /dev/null +++ b/crates/users-service/src/state.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use sqlx::PgPool; +use stack_up::{ + Configuration, + cache::{RedisConnection, RedisManager}, +}; +use tonic::Status; +use tracing::error; + +use crate::cnfg::LocalConfig; + +#[derive(Clone)] +pub struct AppHandle(Arc<AppState>); + +impl std::ops::Deref for AppHandle { + type Target = Arc<AppState>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Clone)] +pub struct Services { + pub postgres: PgPool, + pub cache: RedisManager, +} + +impl Services { + pub fn new(postgres: PgPool, cache: RedisManager) -> Self { + Self { postgres, cache } + } +} + +pub struct AppState { + pub services: Services, + pub local_config: LocalConfig, +} + +impl AppState { + pub async fn create( + services: Services, + configuration: &Configuration, + ) -> Result<AppHandle, anyhow::Error> { + let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + + Ok(AppHandle(Arc::new(Self { + services, + local_config, + }))) + } + + pub async fn cache(&self) -> Result<RedisConnection, tonic::Status> { + let cache = self + .services + .cache + .get() + .await + .inspect_err(|e| error!("{e}")) + .map_err(|_e| Status::internal("storage not ready"))?; + + Ok(cache) + } +} diff --git a/crates/users-service/users.toml b/crates/users-service/users.toml new file mode 100644 index 0000000..9706fcf --- /dev/null +++ b/crates/users-service/users.toml @@ -0,0 +1,37 @@ +[application] +env = "development" +port = 1610 + +[monitoring] +log-level = "sellershut_users=trace,info" + +[misc] +temp-ttl = 1000 +cache-ttl = 300 + +[database] +pool_size = 100 +port = 5432 +name = "profiles" +host = "localhost" +password = "password" +user = "postgres" + +[nats] +hosts = ["nats://localhost:4222"] + +[cache] +dsn = "redis://localhost:6379" +pooled = true +type = "non-clustered" # clustered, non-clustered or sentinel +max-connections = 100 + +[cache.sentinel] +master-name = "mymaster" +nodes = [ + { host = "127.0.0.1", port = 26379 }, + { host = "127.0.0.2", port = 26379 }, + { host = "127.0.0.3", port = 26379 }, +] + +# vim:ft=toml |