diff options
-rw-r--r-- | Cargo.lock | 11 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/entity/user.rs | 32 | ||||
-rw-r--r-- | src/server.rs | 21 | ||||
-rw-r--r-- | src/server/activities.rs | 1 | ||||
-rw-r--r-- | src/server/activities/accept.rs | 52 | ||||
-rw-r--r-- | src/server/activities/follow.rs | 40 | ||||
-rw-r--r-- | src/server/routes/users.rs | 20 | ||||
-rw-r--r-- | src/server/routes/users/post_inbox.rs | 16 | ||||
-rw-r--r-- | src/state.rs | 6 |
10 files changed, 188 insertions, 13 deletions
@@ -1453,6 +1453,15 @@ dependencies = [ ] [[package]] +name = "nanoid" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8" +dependencies = [ + "rand 0.8.5", +] + +[[package]] name = "nu-ansi-term" version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2052,6 +2061,8 @@ dependencies = [ "axum", "clap", "config", + "enum_delegate", + "nanoid", "serde", "serde_json", "sqlx", @@ -14,6 +14,8 @@ async-trait = "0.1.88" axum = { version = "0.8.4", features = ["macros"] } clap = { version = "4.5.41", features = ["derive"] } config = { version = "0.15.13", default-features = false, features = ["toml"] } +enum_delegate = "0.2.0" +nanoid = "0.4.0" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" sqlx = { version = "0.8.6", features = ["macros", "migrate", "runtime-tokio", "time", "tls-rustls", "uuid"] } diff --git a/src/entity/user.rs b/src/entity/user.rs index 47a761a..1abf50f 100644 --- a/src/entity/user.rs +++ b/src/entity/user.rs @@ -3,12 +3,14 @@ pub mod followers; use std::fmt::Display; use activitypub_federation::{ + activity_queue::queue_activity, + activity_sending::SendActivityTask, config::Data, fetch::object_id::ObjectId, http_signatures::generate_actor_keypair, kinds::actor::{ApplicationType, GroupType, OrganizationType, PersonType, ServiceType}, - protocol::public_key::PublicKey, - traits::{Actor, Object}, + protocol::{context::WithContext, public_key::PublicKey}, + traits::{Activity, Actor, Object}, }; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -21,7 +23,7 @@ use uuid::Uuid; use crate::{error::AppError, state::AppHandle}; #[derive(PartialEq, Clone, Debug)] -pub(crate) struct User { +pub struct User { pub id: String, pub username: String, pub ap_id: ObjectId<User>, @@ -166,6 +168,30 @@ impl User { ).fetch_one(&services.postgres).await?; Self::try_from(user) } + + pub(crate) async fn send<A>( + &self, + activity: A, + recipients: Vec<Url>, + use_queue: bool, + data: &Data<AppHandle>, + ) -> Result<(), AppError> + where + A: Activity + Serialize + std::fmt::Debug + Send + Sync, + <A as Activity>::Error: From<anyhow::Error> + From<serde_json::Error>, + { + let activity = WithContext::new_default(activity); + // Send through queue in some cases and bypass it in others to test both code paths + if use_queue { + queue_activity(&activity, self, recipients, data).await?; + } else { + let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?; + for send in sends { + send.sign_and_send(data).await?; + } + } + Ok(()) + } } #[derive(Clone, Debug, Deserialize, Serialize)] diff --git a/src/server.rs b/src/server.rs index 7ed3f87..dd49a54 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,12 +1,31 @@ use activitypub_federation::config::{FederationConfig, FederationMiddleware}; use axum::{Router, routing::get}; +use nanoid::nanoid; +use stack_up::Environment; use tower_http::trace::TraceLayer; +use url::Url; -use crate::{server::routes::health_check, state::AppHandle}; +use crate::{error::AppError, server::routes::health_check, state::AppHandle}; pub mod activities; pub mod routes; +const ALPHABET: [char; 36] = [ + '2', '3', '4', '5', '6', '7', '8', '9', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', + 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '-', +]; + +pub fn generate_object_id(domain: &str, env: Environment) -> Result<Url, AppError> { + let id = nanoid!(21, &ALPHABET); + Ok(Url::parse(&format!( + "{}://{domain}/objects/{id}", + match env { + Environment::Development => "http", + Environment::Production => "https", + }, + ))?) +} + pub fn router(state: FederationConfig<AppHandle>) -> Router { Router::new() .merge(routes::users::users_router()) diff --git a/src/server/activities.rs b/src/server/activities.rs index 62d6fa2..5e2ad4b 100644 --- a/src/server/activities.rs +++ b/src/server/activities.rs @@ -1 +1,2 @@ +pub mod accept; pub mod follow; diff --git a/src/server/activities/accept.rs b/src/server/activities/accept.rs new file mode 100644 index 0000000..44f26f6 --- /dev/null +++ b/src/server/activities/accept.rs @@ -0,0 +1,52 @@ +use crate::{ + entity::user::User, error::AppError, server::activities::follow::Follow, state::AppHandle, +}; +use activitypub_federation::{ + config::Data, fetch::object_id::ObjectId, kinds::activity::AcceptType, traits::Activity, +}; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Accept { + actor: ObjectId<User>, + object: Follow, + #[serde(rename = "type")] + kind: AcceptType, + id: Url, +} + +impl Accept { + pub fn new(actor: ObjectId<User>, object: Follow, id: Url) -> Accept { + Accept { + actor, + object, + kind: Default::default(), + id, + } + } +} + +#[async_trait] +impl Activity for Accept { + type DataType = AppHandle; + type Error = AppError; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive(self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { + Ok(()) + } +} diff --git a/src/server/activities/follow.rs b/src/server/activities/follow.rs index 9148f02..466edb7 100644 --- a/src/server/activities/follow.rs +++ b/src/server/activities/follow.rs @@ -1,11 +1,20 @@ use activitypub_federation::{ - config::Data, fetch::object_id::ObjectId, kinds::activity::FollowType, traits::Activity, + config::Data, + fetch::object_id::ObjectId, + kinds::activity::FollowType, + traits::{Activity, Actor}, }; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use url::Url; +use uuid::Uuid; -use crate::{entity::user::User, error::AppError, state::AppHandle}; +use crate::{ + entity::user::User, + error::AppError, + server::{activities::accept::Accept, generate_object_id}, + state::AppHandle, +}; #[derive(Deserialize, Serialize, Clone, Debug)] #[serde(rename_all = "camelCase")] @@ -39,20 +48,20 @@ impl Activity for Follow { #[doc = " `id` field of the activity"] fn id(&self) -> &Url { - todo!() + &self.id } #[doc = " `actor` field of activity"] fn actor(&self) -> &Url { - todo!() + self.actor.inner() } #[doc = " Verifies that the received activity is valid."] #[doc = ""] #[doc = " This needs to be a separate method, because it might be used for activities"] #[doc = " like `Undo/Follow`, which shouldn\'t perform any database write for the inner `Follow`."] - async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { - todo!() + async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> { + Ok(()) } #[doc = " Called when an activity is received."] @@ -60,6 +69,23 @@ impl Activity for Follow { #[doc = " Should perform validation and possibly write action to the database. In case the activity"] #[doc = " has a nested `object` field, must call `object.from_json` handler."] async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> { - todo!() + let id = Uuid::now_v7(); + + sqlx::query!("insert into following (id, follower, followee) values ($1, $2, $3) on conflict (follower, followee) do nothing" + ,id, + self.actor.inner().as_str(), + self.object.inner().as_str(), + ).execute(&data.services.postgres).await?; + + let follower = self.actor.dereference(data).await?; + let id = generate_object_id(data.domain(), data.environment)?; + + let local_user = self.object.dereference(data).await?; + let accept = Accept::new(self.object.clone(), self, id.clone()); + + local_user + .send(accept, vec![follower.shared_inbox_or_inbox()], false, data) + .await?; + Ok(()) } } diff --git a/src/server/routes/users.rs b/src/server/routes/users.rs index d3ce446..9c9a3bf 100644 --- a/src/server/routes/users.rs +++ b/src/server/routes/users.rs @@ -1,12 +1,30 @@ pub mod get_outbox; +pub mod post_inbox; pub mod get_user; pub mod webfinger; -use axum::{Router, routing::get}; +use activitypub_federation::traits::Activity; +use axum::{routing::{get, post}, Router}; +use serde::{Deserialize, Serialize}; + +use crate::server::activities::{accept::Accept, follow::Follow}; +use url::Url; +use activitypub_federation::config::Data; + +/// List of all activities which this actor can receive. +#[derive(Deserialize, Serialize, Debug)] +#[serde(untagged)] +#[enum_delegate::implement(Activity)] +pub enum PersonAcceptedActivities { + Follow(Follow), + Accept(Accept), +} + pub fn users_router() -> Router { Router::new() .route("/users/{username}", get(get_user::http_get_user)) .route("/users/{username}/outbox", get(get_outbox::http_get_outbox)) + .route("/users/{username}/inbox", post(post_inbox::http_post_user_inbox)) .route("/.well-known/webfinger", get(webfinger::webfinger)) } diff --git a/src/server/routes/users/post_inbox.rs b/src/server/routes/users/post_inbox.rs new file mode 100644 index 0000000..5e3258b --- /dev/null +++ b/src/server/routes/users/post_inbox.rs @@ -0,0 +1,16 @@ +use activitypub_federation::{axum::inbox::{receive_activity, ActivityData}, config::Data, protocol::context::WithContext}; +use axum::response::IntoResponse; + +use crate::{entity::user::User, server::routes::users::PersonAcceptedActivities, state::AppHandle}; + +pub async fn http_post_user_inbox( + data: Data<AppHandle>, + activity_data: ActivityData, +) -> impl IntoResponse { + receive_activity::<WithContext<PersonAcceptedActivities>, User, AppHandle>( + activity_data, + &data, + ) + .await +} + diff --git a/src/state.rs b/src/state.rs index 1ae6caa..9129030 100644 --- a/src/state.rs +++ b/src/state.rs @@ -18,6 +18,7 @@ impl Deref for AppHandle { pub struct AppState { pub services: Services, + pub environment: Environment, } impl AppState { @@ -38,7 +39,10 @@ impl AppState { let config = FederationConfig::builder() .domain(&warden_config.hostname) .signed_fetch_actor(&user) - .app_data(AppHandle(Arc::new(Self { services }))) + .app_data(AppHandle(Arc::new(Self { + services, + environment: configuration.application.env, + }))) // .url_verifier(Box::new(MyUrlVerifier())) // TODO: could change this to env variable? .debug(configuration.application.env == Environment::Development) |