From 377487e86984441041c23261515bb907fe8a8d06 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Tue, 22 Jul 2025 13:26:56 +0200 Subject: feat: followers --- Cargo.lock | 1 + contrib/bruno/users/follow.bru | 6 +- contrib/bruno/users/webfinger.bru | 4 + crates/sellershut/Cargo.toml | 2 +- crates/sellershut/src/entity/user.rs | 16 ++ crates/sellershut/src/entity/user/followers.rs | 289 ++++++++++++++++++++++ crates/sellershut/src/server/activities/follow.rs | 4 +- crates/sellershut/src/server/routes/users.rs | 5 + 8 files changed, 319 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f2b51d..67f403c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -576,6 +576,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" dependencies = [ "powerfmt", + "serde", ] [[package]] diff --git a/contrib/bruno/users/follow.bru b/contrib/bruno/users/follow.bru index 89cf4e1..7c84f34 100644 --- a/contrib/bruno/users/follow.bru +++ b/contrib/bruno/users/follow.bru @@ -7,11 +7,7 @@ meta { post { url: {{HUT_HOSTNAME}}/users/sellershut/inbox body: none - auth: inherit -} - -headers { - Content-Type: application/activity+json + auth: none } script:pre-request { diff --git a/contrib/bruno/users/webfinger.bru b/contrib/bruno/users/webfinger.bru index a6d15bf..d74f19e 100644 --- a/contrib/bruno/users/webfinger.bru +++ b/contrib/bruno/users/webfinger.bru @@ -13,3 +13,7 @@ get { params:query { resource: acct:sellershut@{{HUT_DOMAIN}} } + +assert { + res.status: eq 200 +} diff --git a/crates/sellershut/Cargo.toml b/crates/sellershut/Cargo.toml index 7501da5..a730dd3 100644 --- a/crates/sellershut/Cargo.toml +++ b/crates/sellershut/Cargo.toml @@ -23,7 +23,7 @@ serde = { workspace = true, features = ["derive"] } serde_json.workspace = true sha2 = "0.10.9" sqlx = { workspace = true, features = ["macros", "migrate", "runtime-tokio", "time", "tls-rustls", "uuid"] } -time = { version = "0.3.41", default-features = false, features = ["parsing"] } +time = { version = "0.3.41", default-features = false, features = ["parsing", "serde"] } tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tower = { workspace = true, features = ["util"] } tower-http = { workspace = true, features = ["map-request-body", "trace", "util"] } diff --git a/crates/sellershut/src/entity/user.rs b/crates/sellershut/src/entity/user.rs index 2d09acc..d58f4eb 100644 --- a/crates/sellershut/src/entity/user.rs +++ b/crates/sellershut/src/entity/user.rs @@ -36,6 +36,7 @@ pub struct User { pub user_type: UserType, } +#[derive(Serialize, Debug)] pub struct DbUser { pub id: String, pub description: Option, @@ -214,6 +215,21 @@ pub struct Person { image: Option, } +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PartialPerson { + #[serde(rename = "type")] + kind: UserType, + preferred_username: String, + id: ObjectId, +} + +impl From for PartialPerson { + fn from(value: Person) -> Self { + Self{ kind: value.kind, preferred_username: value.preferred_username, id: value.id } + } +} + impl Person { pub fn public_id(&self) -> &str { &self.public_key.id diff --git a/crates/sellershut/src/entity/user/followers.rs b/crates/sellershut/src/entity/user/followers.rs index 8b13789..9f60dde 100644 --- a/crates/sellershut/src/entity/user/followers.rs +++ b/crates/sellershut/src/entity/user/followers.rs @@ -1 +1,290 @@ +use activitypub_federation::{ + config::Data, + kinds::collection::{OrderedCollectionPageType, OrderedCollectionType}, + traits::Object, +}; +use async_trait::async_trait; +use base64::{Engine, engine::general_purpose}; +use serde::{Deserialize, Serialize}; +use time::{OffsetDateTime, format_description::well_known::Rfc3339}; +use tracing::trace; +use url::Url; +use crate::{ + entity::user::{DbUser, PartialPerson, Person, User, UserType}, + error::AppError, + server::activities::follow::Follow, + state::AppHandle, +}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Follower { + pub(crate) user_id: String, + pub(crate) cursor: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FollowersCollection { + id: Url, + #[serde(rename = "type")] + kind: OrderedCollectionType, + #[serde(skip_serializing_if = "Option::is_none")] + summary: Option, + total_items: usize, + #[serde(skip_serializing_if = "Option::is_none")] + first: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FollowersPage { + id: Url, + #[serde(rename = "type")] + kind: OrderedCollectionPageType, + #[serde(skip_serializing_if = "Option::is_none")] + summary: Option, + ordered_items: Vec, + next: Option, + part_of: Url, +} + +#[derive(Serialize)] +struct Row { + activity: sqlx::types::Json, + id: String, + description: Option, + username: String, + ap_id: String, + private_key: Option, + public_key: String, + inbox: String, + outbox: Option, + avatar_url: Option, + local: bool, + updated_at: OffsetDateTime, + created_at: OffsetDateTime, + user_type: UserType, +} + +struct DbFollower { + user_type: UserType, + ap_id: String, + username: String, +} + +#[async_trait] +impl Object for Follower { + #[doc = " App data type passed to handlers. Must be identical to"] + #[doc = " [crate::config::FederationConfigBuilder::app_data] type."] + type DataType = AppHandle; + + #[doc = " The type of protocol struct which gets sent over network to federate this database struct."] + type Kind = FollowersCollection; + + #[doc = " Error type returned by handler methods"] + type Error = AppError; + + #[doc = " `id` field of the object"] + fn id(&self) -> &Url { + todo!() + } + + #[doc = " Try to read the object with given `id` from local database."] + #[doc = ""] + #[doc = " Should return `Ok(None)` if not found."] + async fn read_from_id( + object_id: Url, + data: &Data, + ) -> Result, Self::Error> { + todo!() + } + + #[doc = " Convert database type to Activitypub type."] + #[doc = ""] + #[doc = " Called when a local object gets fetched by another instance over HTTP, or when an object"] + #[doc = " gets sent in an activity."] + async fn into_json(self, data: &Data) -> Result { + let domain = data.domain(); + trace!(domain = domain, "setting domain"); + let stub = self.user_id.replace("/followers", ""); + + let actor = Url::parse(&format!("http://{domain}{stub}"))?.to_string(); + trace!("{actor}"); + + let transaction: Result, _> = if let Some(cursor) = self.cursor { + trace!("getting with pagination"); + let result = general_purpose::STANDARD.decode(cursor)?; + let tokens = String::from_utf8_lossy(result.as_slice()); + let mut tokens = tokens.split("|"); + let id = tokens.next().unwrap(); + let ts = tokens.next().unwrap(); + let ts = OffsetDateTime::parse(ts, &Rfc3339)?; + sqlx::query_as!( + Row, + r#" + select + p.activity as "activity: sqlx::types::Json", + a.* as person + from + activity p + inner join account a on p.actor=a.ap_id + where + ( + p.activity->'type' ? $1 + and + p.actor = $2 + and + p.activity_id > $3 + ) + or + p.created_at >= $4 + order by + p.created_at asc + limit 20 + "#, + "Follow", + actor, + id, + ts + ) + .fetch_all(&data.services.postgres) + .await? + .into_iter() + .map(|value| { + serde_json::from_value::(value.activity.0).map(|follow| { + ( + follow, + DbUser { + id: value.id, + description: value.description, + username: value.username, + ap_id: value.ap_id, + private_key: value.private_key, + public_key: value.public_key, + inbox: value.inbox, + outbox: value.outbox, + avatar_url: value.avatar_url, + local: value.local, + updated_at: value.updated_at, + created_at: value.created_at, + user_type: value.user_type, + }, + ) + }) + }) + .collect() + } else { + sqlx::query_as!( + Row, + r#" + select + p.activity as "activity: sqlx::types::Json", + a.* as person + from + activity p + inner join account a on p.actor=a.ap_id + where + p.activity->'type' ? $1 + and + p.actor = $2 + order by + p.created_at asc + limit 20 + "#, + "Follow", + actor + ) + .fetch_all(&data.services.postgres) + .await? + .into_iter() + .map(|value| { + serde_json::from_value::(value.activity.0).map(|follow| { + ( + follow, + DbUser { + id: value.id, + description: value.description, + username: value.username, + ap_id: value.ap_id, + private_key: value.private_key, + public_key: value.public_key, + inbox: value.inbox, + outbox: value.outbox, + avatar_url: value.avatar_url, + local: value.local, + updated_at: value.updated_at, + created_at: value.created_at, + user_type: value.user_type, + }, + ) + }) + }) + .collect() + }; + + let transaction = transaction?; + trace!("{transaction:?}"); + + let mut url = Url::parse(&format!("http://{domain}{}", self.user_id))?; + trace!(domain = domain, "setting path"); + + let mut users = Vec::with_capacity(transaction.len()); + let mut timestamps = Vec::with_capacity(transaction.len()); + + for (_, follower) in transaction.into_iter() { + let ts = follower.created_at.format(&Rfc3339)?; + let map = User::try_from(follower)?; + users.push(map.into_json(data)); + timestamps.push(ts); + } + let ordered_items: Vec<_> = futures_util::future::try_join_all(users).await?.into_iter().map(PartialPerson::from).collect(); + + trace!("{ordered_items:?}"); + + Ok(Self::Kind { + id: url.clone(), + kind: OrderedCollectionType::OrderedCollection, + summary: Some("the followers".to_owned()), + total_items: ordered_items.len(), + first: ordered_items.first().map(|user| FollowersPage { + part_of: url.clone(), + id: { + let cursor = format!("{}|{}", user.id, ×tamps[0]); + let cursor = general_purpose::STANDARD.encode(cursor); + + url.set_query(Some(&format!("cursor={cursor}"))); + url + }, + kind: OrderedCollectionPageType::OrderedCollectionPage, + summary: None, + ordered_items: ordered_items.clone(), + next: None, + }), + }) + } + + #[doc = " Verifies that the received object is valid."] + #[doc = ""] + #[doc = " You should check here that the domain of id matches `expected_domain`. Additionally you"] + #[doc = " should perform any application specific checks."] + #[doc = ""] + #[doc = " It is necessary to use a separate method for this, because it might be used for activities"] + #[doc = " like `Delete/Note`, which shouldn\'t perform any database write for the inner `Note`."] + async fn verify( + json: &Self::Kind, + expected_domain: &Url, + data: &Data, + ) -> Result<(), Self::Error> { + todo!() + } + + #[doc = " Convert object from ActivityPub type to database type."] + #[doc = ""] + #[doc = " Called when an object is received from HTTP fetch or as part of an activity. This method"] + #[doc = " should write the received object to database. Note that there is no distinction between"] + #[doc = " create and update, so an `upsert` operation should be used."] + async fn from_json(json: Self::Kind, data: &Data) -> Result { + todo!() + } +} diff --git a/crates/sellershut/src/server/activities/follow.rs b/crates/sellershut/src/server/activities/follow.rs index 07d2793..6a8fca5 100644 --- a/crates/sellershut/src/server/activities/follow.rs +++ b/crates/sellershut/src/server/activities/follow.rs @@ -22,8 +22,8 @@ pub struct Follow { pub actor: ObjectId, pub object: ObjectId, #[serde(rename = "type")] - kind: FollowType, - id: Url, + pub kind: FollowType, + pub id: Url, } impl Follow { diff --git a/crates/sellershut/src/server/routes/users.rs b/crates/sellershut/src/server/routes/users.rs index 56078b6..ad5d258 100644 --- a/crates/sellershut/src/server/routes/users.rs +++ b/crates/sellershut/src/server/routes/users.rs @@ -1,3 +1,4 @@ +pub mod followers; pub mod get_outbox; pub mod get_user; pub mod post_inbox; @@ -32,6 +33,10 @@ pub enum PersonAcceptedActivities { pub fn users_router(state: FederationConfig) -> Router { Router::new() .route("/users/{username}", get(get_user::http_get_user)) + .route( + "/users/{username}/followers", + get(followers::http_get_followers), + ) .route("/users/{username}/outbox", get(get_outbox::http_get_outbox)) .route( "/users/{username}/inbox", -- cgit v1.2.3