Browse Source

implement ActivitySender actor (#89)

Merge pull request 'Adding unique ap_ids. Fixes #1100' (#90) from unique_ap_ids into activity-sender

Reviewed-on: https://yerbamate.dev/LemmyNet/lemmy/pulls/90

Adding back in on_conflict.

Trying to add back in the on_conflict_do_nothing.

Trying to reduce delay time.

Removing createFakes.

Removing some unit tests.

Adding comment jest timeout.

Fixing tests again.

Fixing tests again.

Merge branch 'activity-sender' into unique_ap_ids_2

Replace actix client with reqwest to speed up federation tests

Trying to fix tests again.

Fixing unit tests.

Fixing some broken unit tests, not done yet.

Adding uniques.

Adding unique ap_ids. Fixes #1100

use proper sql functionality for upsert

added logging

in fetcher, replace post/comment::create with upsert

no need to do an actual update in post/comment::upsert

Merge branch 'main' into activity-sender

implement upsert for user/community

reuse http client

got it working

attempt to use background-jobs crate

rewrite with proper error handling and less boilerplate

remove do_send, dont return errors from activity_sender

WIP: implement ActivitySender actor

Co-authored-by: dessalines <[email protected]>
Co-authored-by: Dessalines <[email protected]>
Co-authored-by: Felix Ableitner <[email protected]>
Reviewed-on: https://yerbamate.dev/LemmyNet/lemmy/pulls/89
feature/settings-cleanup
nutomic 2 years ago
committed by dessalines
parent
commit
f13bf3a788
  1. 198
      server/Cargo.lock
  2. 2
      server/Cargo.toml
  3. 2
      server/lemmy_db/src/activity.rs
  4. 29
      server/lemmy_db/src/comment.rs
  5. 12
      server/lemmy_db/src/comment_view.rs
  6. 16
      server/lemmy_db/src/community.rs
  7. 10
      server/lemmy_db/src/moderator.rs
  8. 2
      server/lemmy_db/src/password_reset_request.rs
  9. 25
      server/lemmy_db/src/post.rs
  10. 10
      server/lemmy_db/src/post_view.rs
  11. 21
      server/lemmy_db/src/private_message.rs
  12. 64
      server/lemmy_db/src/schema.rs
  13. 13
      server/lemmy_db/src/user.rs
  14. 10
      server/lemmy_db/src/user_mention.rs
  15. 27
      server/migrations/2020-08-25-132005_add_unique_ap_ids/down.sql
  16. 56
      server/migrations/2020-08-25-132005_add_unique_ap_ids/up.sql
  17. 2
      server/src/api/comment.rs
  18. 4
      server/src/api/community.rs
  19. 4
      server/src/api/post.rs
  20. 69
      server/src/api/user.rs
  21. 68
      server/src/apub/activities.rs
  22. 133
      server/src/apub/activity_queue.rs
  23. 34
      server/src/apub/comment.rs
  24. 17
      server/src/apub/community.rs
  25. 7
      server/src/apub/extensions/signatures.rs
  26. 24
      server/src/apub/fetcher.rs
  27. 9
      server/src/apub/inbox/activities/delete.rs
  28. 9
      server/src/apub/inbox/activities/remove.rs
  29. 18
      server/src/apub/inbox/activities/undo.rs
  30. 2
      server/src/apub/inbox/shared_inbox.rs
  31. 16
      server/src/apub/inbox/user_inbox.rs
  32. 6
      server/src/apub/mod.rs
  33. 20
      server/src/apub/post.rs
  34. 31
      server/src/apub/private_message.rs
  35. 9
      server/src/apub/user.rs
  36. 10
      server/src/code_migrations.rs
  37. 25
      server/src/lib.rs
  38. 20
      server/src/main.rs
  39. 18
      server/src/request.rs
  40. 10
      server/src/websocket/server.rs

198
server/Cargo.lock

@ -490,6 +490,56 @@ dependencies = [
"serde_urlencoded",
]
[[package]]
name = "background-jobs"
version = "0.8.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb38c4a5de33324650e9023829b0f4129eb5418b29f5dfe69a52100ff5bc50d7"
dependencies = [
"background-jobs-actix",
"background-jobs-core",
]
[[package]]
name = "background-jobs-actix"
version = "0.8.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d012b9293806c777f806b537e04b5eec34ecd6eaf876c52792017695ce53262f"
dependencies = [
"actix-rt",
"anyhow",
"async-trait",
"background-jobs-core",
"chrono",
"log",
"num_cpus",
"rand 0.7.3",
"serde 1.0.114",
"serde_json",
"thiserror",
"tokio",
"uuid 0.8.1",
]
[[package]]
name = "background-jobs-core"
version = "0.8.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd5efe91c019d7780d5a2fc2f92a15e1f95b84a761428e1d1972b7428634ebc7"
dependencies = [
"actix-rt",
"anyhow",
"async-trait",
"chrono",
"futures",
"log",
"serde 1.0.114",
"serde_json",
"thiserror",
"tokio",
"uuid 0.8.1",
]
[[package]]
name = "backtrace"
version = "0.3.50"
@ -1501,6 +1551,16 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
dependencies = [
"bytes",
"http",
]
[[package]]
name = "http-signature-normalization"
version = "0.5.2"
@ -1544,6 +1604,43 @@ dependencies = [
"quick-error",
]
[[package]]
name = "hyper"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e68a8dd9716185d9e64ea473ea6ef63529252e3e27623295a0378a19665d5eb"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
"itoa",
"pin-project",
"socket2",
"time 0.1.43",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper-tls"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-tls",
]
[[package]]
name = "ident_case"
version = "1.0.1"
@ -1618,9 +1715,15 @@ dependencies = [
"socket2",
"widestring",
"winapi 0.3.9",
"winreg",
"winreg 0.6.2",
]
[[package]]
name = "ipnet"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135"
[[package]]
name = "itertools"
version = "0.9.0"
@ -1723,6 +1826,7 @@ dependencies = [
"anyhow",
"async-trait",
"awc",
"background-jobs",
"base64 0.12.3",
"bcrypt",
"captcha",
@ -1743,6 +1847,7 @@ dependencies = [
"openssl",
"percent-encoding",
"rand 0.7.3",
"reqwest",
"rss",
"serde 1.0.114",
"serde_json",
@ -2669,6 +2774,42 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "reqwest"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e"
dependencies = [
"base64 0.12.3",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"http",
"http-body",
"hyper",
"hyper-tls",
"ipnet",
"js-sys",
"lazy_static",
"log",
"mime",
"mime_guess",
"native-tls",
"percent-encoding",
"pin-project-lite",
"serde 1.0.114",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-tls",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg 0.7.0",
]
[[package]]
name = "resolv-conf"
version = "0.6.3"
@ -3261,6 +3402,16 @@ dependencies = [
"webpki",
]
[[package]]
name = "tokio-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.2.0"
@ -3290,6 +3441,12 @@ dependencies = [
"tokio",
]
[[package]]
name = "tower-service"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860"
[[package]]
name = "tracing"
version = "0.1.18"
@ -3350,6 +3507,12 @@ dependencies = [
"trust-dns-proto",
]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "twoway"
version = "0.2.1"
@ -3528,6 +3691,16 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
@ -3541,6 +3714,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0563a9a4b071746dd5aedbc3a28c6fe9be4586fb3fbadb67c400d4f53c6b16c"
dependencies = [
"cfg-if",
"serde 1.0.114",
"serde_json",
"wasm-bindgen-macro",
]
@ -3559,6 +3734,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95f8d235a77f880bcef268d379810ea6c0af2eacfa90b1ad5af731776e0c4699"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.67"
@ -3675,6 +3862,15 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "winreg"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "winutil"
version = "0.1.1"

2
server/Cargo.toml

@ -53,3 +53,5 @@ async-trait = "0.1.36"
captcha = "0.0.7"
anyhow = "1.0.32"
thiserror = "1.0.20"
background-jobs = " 0.8.0-alpha.2"
reqwest = { version = "0.10", features = ["json"] }

2
server/lemmy_db/src/activity.rs

@ -113,7 +113,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_862362".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,

29
server/lemmy_db/src/comment.rs

@ -39,13 +39,13 @@ pub struct CommentForm {
pub published: Option<chrono::NaiveDateTime>,
pub updated: Option<chrono::NaiveDateTime>,
pub deleted: Option<bool>,
pub ap_id: String,
pub ap_id: Option<String>,
pub local: bool,
}
impl CommentForm {
pub fn get_ap_id(&self) -> Result<Url, ParseError> {
Url::parse(&self.ap_id)
Url::parse(&self.ap_id.as_ref().unwrap_or(&"not_a_url".to_string()))
}
}
@ -163,12 +163,13 @@ impl Comment {
}
pub fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result<Self, Error> {
let existing = Self::read_from_apub_id(conn, &comment_form.ap_id);
match existing {
Err(NotFound {}) => Ok(Self::create(conn, &comment_form)?),
Ok(p) => Ok(Self::update(conn, p.id, &comment_form)?),
Err(e) => Err(e),
}
use crate::schema::comment::dsl::*;
insert_into(comment)
.values(comment_form)
.on_conflict(ap_id)
.do_update()
.set(comment_form)
.get_result::<Self>(conn)
}
}
@ -272,7 +273,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_283687".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -292,7 +293,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_928738972".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -320,7 +321,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -337,7 +338,7 @@ mod tests {
parent_id: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};
@ -354,7 +355,7 @@ mod tests {
parent_id: None,
published: inserted_comment.published,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: inserted_comment.ap_id.to_owned(),
local: true,
};
@ -368,7 +369,7 @@ mod tests {
read: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};

12
server/lemmy_db/src/comment_view.rs

@ -517,7 +517,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_92873982".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -537,7 +537,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_7625376".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -565,7 +565,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -582,7 +582,7 @@ mod tests {
read: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};
@ -627,7 +627,7 @@ mod tests {
my_vote: None,
subscribed: None,
saved: None,
ap_id: "http://fake.com".to_string(),
ap_id: inserted_comment.ap_id.to_owned(),
local: true,
community_actor_id: inserted_community.actor_id.to_owned(),
community_local: true,
@ -665,7 +665,7 @@ mod tests {
my_vote: Some(1),
subscribed: Some(false),
saved: Some(false),
ap_id: "http://fake.com".to_string(),
ap_id: inserted_comment.ap_id.to_owned(),
local: true,
community_actor_id: inserted_community.actor_id.to_owned(),
community_local: true,

16
server/lemmy_db/src/community.rs

@ -45,7 +45,7 @@ pub struct CommunityForm {
pub updated: Option<chrono::NaiveDateTime>,
pub deleted: Option<bool>,
pub nsfw: bool,
pub actor_id: String,
pub actor_id: Option<String>,
pub local: bool,
pub private_key: Option<String>,
pub public_key: Option<String>,
@ -160,6 +160,16 @@ impl Community {
.unwrap_or_default()
.contains(&user_id)
}
pub fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result<Community, Error> {
use crate::schema::community::dsl::*;
insert_into(community)
.values(community_form)
.on_conflict(actor_id)
.do_update()
.set(community_form)
.get_result::<Self>(conn)
}
}
#[derive(Identifiable, Queryable, Associations, PartialEq, Debug)]
@ -320,7 +330,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_8266238".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -340,7 +350,7 @@ mod tests {
removed: None,
deleted: None,
updated: None,
actor_id: "changeme_7625376".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,

10
server/lemmy_db/src/moderator.rs

@ -426,7 +426,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_829398".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -454,7 +454,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_82982738".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -474,7 +474,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_283687".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -502,7 +502,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -519,7 +519,7 @@ mod tests {
parent_id: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};

2
server/lemmy_db/src/password_reset_request.rs

@ -103,7 +103,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_8292378".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,

25
server/lemmy_db/src/post.rs

@ -53,13 +53,13 @@ pub struct PostForm {
pub embed_description: Option<String>,
pub embed_html: Option<String>,
pub thumbnail_url: Option<String>,
pub ap_id: String,
pub ap_id: Option<String>,
pub local: bool,
}
impl PostForm {
pub fn get_ap_id(&self) -> Result<Url, ParseError> {
Url::parse(&self.ap_id)
Url::parse(&self.ap_id.as_ref().unwrap_or(&"not_a_url".to_string()))
}
}
@ -180,12 +180,13 @@ impl Post {
}
pub fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result<Post, Error> {
let existing = Self::read_from_apub_id(conn, &post_form.ap_id);
match existing {
Err(NotFound {}) => Ok(Self::create(conn, &post_form)?),
Ok(p) => Ok(Self::update(conn, p.id, &post_form)?),
Err(e) => Err(e),
}
use crate::schema::post::dsl::*;
insert_into(post)
.values(post_form)
.on_conflict(ap_id)
.do_update()
.set(post_form)
.get_result::<Self>(conn)
}
}
@ -358,7 +359,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_8292683678".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -378,7 +379,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_8223262378".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -406,7 +407,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -431,7 +432,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: inserted_post.ap_id.to_owned(),
local: true,
};

10
server/lemmy_db/src/post_view.rs

@ -423,7 +423,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_8282738268".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -443,7 +443,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_2763".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -471,7 +471,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -555,7 +555,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".to_string(),
ap_id: inserted_post.ap_id.to_owned(),
local: true,
creator_actor_id: inserted_user.actor_id.to_owned(),
creator_local: true,
@ -604,7 +604,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".to_string(),
ap_id: inserted_post.ap_id.to_owned(),
local: true,
creator_actor_id: inserted_user.actor_id.to_owned(),
creator_local: true,

21
server/lemmy_db/src/private_message.rs

@ -27,7 +27,7 @@ pub struct PrivateMessageForm {
pub read: Option<bool>,
pub published: Option<chrono::NaiveDateTime>,
pub updated: Option<chrono::NaiveDateTime>,
pub ap_id: String,
pub ap_id: Option<String>,
pub local: bool,
}
@ -119,6 +119,17 @@ impl PrivateMessage {
.set(read.eq(true))
.get_results::<Self>(conn)
}
// TODO use this
pub fn upsert(conn: &PgConnection, private_message_form: &PrivateMessageForm) -> Result<Self, Error> {
use crate::schema::private_message::dsl::*;
insert_into(private_message)
.values(private_message_form)
.on_conflict(ap_id)
.do_update()
.set(private_message_form)
.get_result::<Self>(conn)
}
}
#[cfg(test)]
@ -153,7 +164,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_6723878".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -181,7 +192,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_287263876".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -199,7 +210,7 @@ mod tests {
read: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};
@ -214,7 +225,7 @@ mod tests {
read: false,
updated: None,
published: inserted_private_message.published,
ap_id: "http://fake.com".into(),
ap_id: inserted_private_message.ap_id.to_owned(),
local: true,
};

64
server/lemmy_db/src/schema.rs

@ -523,36 +523,36 @@ joinable!(user_mention -> comment (comment_id));
joinable!(user_mention -> user_ (recipient_id));
allow_tables_to_appear_in_same_query!(
activity,
category,
comment,
comment_aggregates_fast,
comment_like,
comment_saved,
community,
community_aggregates_fast,
community_follower,
community_moderator,
community_user_ban,
mod_add,
mod_add_community,
mod_ban,
mod_ban_from_community,
mod_lock_post,
mod_remove_comment,
mod_remove_community,
mod_remove_post,
mod_sticky_post,
password_reset_request,
post,
post_aggregates_fast,
post_like,
post_read,
post_saved,
private_message,
site,
user_,
user_ban,
user_fast,
user_mention,
activity,
category,
comment,
comment_aggregates_fast,
comment_like,
comment_saved,
community,
community_aggregates_fast,
community_follower,
community_moderator,
community_user_ban,
mod_add,
mod_add_community,
mod_ban,
mod_ban_from_community,
mod_lock_post,
mod_remove_comment,
mod_remove_community,
mod_remove_post,
mod_sticky_post,
password_reset_request,
post,
post_aggregates_fast,
post_like,
post_read,
post_saved,
private_message,
site,
user_,
user_ban,
user_fast,
user_mention,
);

13
server/lemmy_db/src/user.rs

@ -57,7 +57,7 @@ pub struct UserForm {
pub show_avatars: bool,
pub send_notifications_to_email: bool,
pub matrix_user_id: Option<String>,
pub actor_id: String,
pub actor_id: Option<String>,
pub bio: Option<String>,
pub local: bool,
pub private_key: Option<String>,
@ -152,6 +152,15 @@ impl User_ {
pub fn get_profile_url(&self, hostname: &str) -> String {
format!("https://{}/u/{}", hostname, self.name)
}
pub fn upsert(conn: &PgConnection, user_form: &UserForm) -> Result<User_, Error> {
insert_into(user_)
.values(user_form)
.on_conflict(actor_id)
.do_update()
.set(user_form)
.get_result::<Self>(conn)
}
}
#[cfg(test)]
@ -180,7 +189,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_9826382637".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,

10
server/lemmy_db/src/user_mention.rs

@ -106,7 +106,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_628763".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -134,7 +134,7 @@ mod tests {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: "changeme_927389278".into(),
actor_id: None,
bio: None,
local: true,
private_key: None,
@ -154,7 +154,7 @@ mod tests {
deleted: None,
updated: None,
nsfw: false,
actor_id: "changeme_876238".into(),
actor_id: None,
local: true,
private_key: None,
public_key: None,
@ -182,7 +182,7 @@ mod tests {
embed_description: None,
embed_html: None,
thumbnail_url: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -199,7 +199,7 @@ mod tests {
parent_id: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};

27
server/migrations/2020-08-25-132005_add_unique_ap_ids/down.sql

@ -0,0 +1,27 @@
-- Drop the uniques
alter table private_message drop constraint idx_private_message_ap_id;
alter table post drop constraint idx_post_ap_id;
alter table comment drop constraint idx_comment_ap_id;
alter table user_ drop constraint idx_user_actor_id;
alter table community drop constraint idx_community_actor_id;
alter table private_message alter column ap_id set not null;
alter table private_message alter column ap_id set default 'http://fake.com';
alter table post alter column ap_id set not null;
alter table post alter column ap_id set default 'http://fake.com';
alter table comment alter column ap_id set not null;
alter table comment alter column ap_id set default 'http://fake.com';
update private_message
set ap_id = 'http://fake.com'
where ap_id like 'changeme_%';
update post
set ap_id = 'http://fake.com'
where ap_id like 'changeme_%';
update comment
set ap_id = 'http://fake.com'
where ap_id like 'changeme_%';

56
server/migrations/2020-08-25-132005_add_unique_ap_ids/up.sql

@ -0,0 +1,56 @@
-- Add unique ap_id for private_message, comment, and post
-- Need to delete the possible dupes for ones that don't start with the fake one
delete from private_message a using (
select min(id) as id, ap_id
from private_message
group by ap_id having count(*) > 1
) b
where a.ap_id = b.ap_id
and a.id <> b.id;
delete from post a using (
select min(id) as id, ap_id
from post
group by ap_id having count(*) > 1
) b
where a.ap_id = b.ap_id
and a.id <> b.id;
delete from comment a using (
select min(id) as id, ap_id
from comment
group by ap_id having count(*) > 1
) b
where a.ap_id = b.ap_id
and a.id <> b.id;
-- Replacing the current default on the columns, to the unique one
update private_message
set ap_id = generate_unique_changeme()
where ap_id = 'http://fake.com';
update post
set ap_id = generate_unique_changeme()
where ap_id = 'http://fake.com';
update comment
set ap_id = generate_unique_changeme()
where ap_id = 'http://fake.com';
-- Add the unique indexes
alter table private_message alter column ap_id set not null;
alter table private_message alter column ap_id set default generate_unique_changeme();
alter table post alter column ap_id set not null;
alter table post alter column ap_id set default generate_unique_changeme();
alter table comment alter column ap_id set not null;
alter table comment alter column ap_id set default generate_unique_changeme();
-- Add the uniques, for user_ and community too
alter table private_message add constraint idx_private_message_ap_id unique (ap_id);
alter table post add constraint idx_post_ap_id unique (ap_id);
alter table comment add constraint idx_comment_ap_id unique (ap_id);
alter table user_ add constraint idx_user_actor_id unique (actor_id);
alter table community add constraint idx_community_actor_id unique (actor_id);

2
server/src/api/comment.rs

@ -146,7 +146,7 @@ impl Perform for CreateComment {
read: None,
published: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
};

4
server/src/api/community.rs

@ -274,7 +274,7 @@ impl Perform for CreateCommunity {
deleted: None,
nsfw: data.nsfw,
updated: None,
actor_id,
actor_id: Some(actor_id),
local: true,
private_key: Some(keypair.private_key),
public_key: Some(keypair.public_key),
@ -368,7 +368,7 @@ impl Perform for EditCommunity {
deleted: Some(read_community.deleted),
nsfw: data.nsfw,
updated: Some(naive_now()),
actor_id: read_community.actor_id,
actor_id: Some(read_community.actor_id),
local: read_community.local,
private_key: read_community.private_key,
public_key: read_community.public_key,

4
server/src/api/post.rs

@ -187,7 +187,7 @@ impl Perform for CreatePost {
embed_description: iframely_description,
embed_html: iframely_html,
thumbnail_url: pictrs_thumbnail,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};
@ -518,7 +518,7 @@ impl Perform for EditPost {
embed_description: iframely_description,
embed_html: iframely_html,
thumbnail_url: pictrs_thumbnail,
ap_id: orig_post.ap_id,
ap_id: Some(orig_post.ap_id),
local: orig_post.local,
published: None,
};

69
server/src/api/user.rs

@ -410,7 +410,7 @@ impl Perform for Register {
lang: "browser".into(),
show_avatars: true,
send_notifications_to_email: false,
actor_id: make_apub_endpoint(EndpointType::User, &data.username).to_string(),
actor_id: Some(make_apub_endpoint(EndpointType::User, &data.username).to_string()),
bio: None,
local: true,
private_key: Some(user_keypair.private_key),
@ -441,37 +441,38 @@ impl Perform for Register {
let main_community_keypair = generate_actor_keypair()?;
// Create the main community if it doesn't exist
let main_community = match blocking(context.pool(), move |conn| Community::read(conn, 2))
.await?
{
Ok(c) => c,
Err(_e) => {
let default_community_name = "main";
let community_form = CommunityForm {
name: default_community_name.to_string(),
title: "The Default Community".to_string(),
description: Some("The Default Community".to_string()),
category_id: 1,
nsfw: false,
creator_id: inserted_user.id,
removed: None,
deleted: None,
updated: None,
actor_id: make_apub_endpoint(EndpointType::Community, default_community_name).to_string(),
local: true,
private_key: Some(main_community_keypair.private_key),
public_key: Some(main_community_keypair.public_key),
last_refreshed_at: None,
published: None,
icon: None,
banner: None,
};
blocking(context.pool(), move |conn| {
Community::create(conn, &community_form)
})
.await??
}
};
let main_community =
match blocking(context.pool(), move |conn| Community::read(conn, 2)).await? {
Ok(c) => c,
Err(_e) => {
let default_community_name = "main";
let community_form = CommunityForm {
name: default_community_name.to_string(),
title: "The Default Community".to_string(),
description: Some("The Default Community".to_string()),
category_id: 1,
nsfw: false,
creator_id: inserted_user.id,
removed: None,
deleted: None,
updated: None,
actor_id: Some(
make_apub_endpoint(EndpointType::Community, default_community_name).to_string(),
),
local: true,
private_key: Some(main_community_keypair.private_key),
public_key: Some(main_community_keypair.public_key),
last_refreshed_at: None,
published: None,
icon: None,
banner: None,
};
blocking(context.pool(), move |conn| {
Community::create(conn, &community_form)
})
.await??
}
};
// Sign them up for main community no matter what
let community_follower_form = CommunityFollowerForm {
@ -643,7 +644,7 @@ impl Perform for SaveUserSettings {
lang: data.lang.to_owned(),
show_avatars: data.show_avatars,
send_notifications_to_email: data.send_notifications_to_email,
actor_id: read_user.actor_id,
actor_id: Some(read_user.actor_id),
bio,
local: read_user.local,
private_key: read_user.private_key,
@ -1218,7 +1219,7 @@ impl Perform for CreatePrivateMessage {
deleted: None,
read: None,
updated: None,
ap_id: "http://fake.com".into(),
ap_id: None,
local: true,
published: None,
};

68
server/src/apub/activities.rs

@ -1,72 +1,38 @@
use crate::{
apub::{
check_is_apub_id_valid,
community::do_announce,
extensions::signatures::sign,
insert_activity,
ActorType,
},
request::retry_custom,
apub::{activity_queue::send_activity, community::do_announce, insert_activity},
LemmyContext,
LemmyError,
};
use activitystreams::base::AnyBase;
use actix_web::client::Client;
use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
};
use lemmy_db::{community::Community, user::User_};
use lemmy_utils::{get_apub_protocol_string, settings::Settings};
use log::debug;
use serde::{export::fmt::Debug, Serialize};
use url::{ParseError, Url};
use uuid::Uuid;
pub async fn send_activity_to_community(
pub async fn send_activity_to_community<T, Kind>(
creator: &User_,
community: &Community,
to: Vec<Url>,
activity: AnyBase,
activity: T,
context: &LemmyContext,
) -> Result<(), LemmyError> {
) -> Result<(), LemmyError>
where
T: AsObject<Kind> + Extends<Kind> + Serialize + Debug + Send + Clone + 'static,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
// TODO: looks like call this sometimes with activity, and sometimes with any_base
insert_activity(creator.id, activity.clone(), true, context.pool()).await?;
// if this is a local community, we need to do an announce from the community instead
if community.local {
do_announce(activity, &community, creator, context).await?;
do_announce(activity.into_any_base()?, &community, creator, context).await?;
} else {
send_activity(context.client(), &activity, creator, to).await?;
}
Ok(())
}
/// Send an activity to a list of recipients, using the correct headers etc.
pub async fn send_activity(
client: &Client,
activity: &AnyBase,
actor: &dyn ActorType,
to: Vec<Url>,
) -> Result<(), LemmyError> {
if !Settings::get().federation.enabled {
return Ok(());
}
let activity = serde_json::to_string(&activity)?;
debug!("Sending activitypub activity {} to {:?}", activity, to);
for to_url in to {
check_is_apub_id_valid(&to_url)?;
let res = retry_custom(|| async {
let request = client
.post(to_url.as_str())
.header("Content-Type", "application/json");
match sign(request, actor, activity.clone()).await {
Ok(signed) => Ok(signed.send().await),
Err(e) => Err(e),
}
})
.await?;
debug!("Result for activity send: {:?}", res);
send_activity(context.activity_queue(), activity, creator, to)?;
}
Ok(())

133
server/src/apub/activity_queue.rs

@ -0,0 +1,133 @@
use crate::{
apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType},
LemmyError,
};
use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
};
use anyhow::{anyhow, Context, Error};
use awc::Client;
use background_jobs::{
create_server,
memory_storage::Storage,
ActixJob,
Backoff,
MaxRetries,
QueueHandle,
WorkerConfig,
};
use lemmy_utils::{location_info, settings::Settings};
use log::warn;
use serde::{Deserialize, Serialize};
use std::{future::Future, pin::Pin};
use url::Url;
pub fn send_activity<T, Kind>(
activity_sender: &QueueHandle,
activity: T,
actor: &dyn ActorType,
to: Vec<Url>,
) -> Result<(), LemmyError>
where
T: AsObject<Kind>,
T: Extends<Kind>,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
if !Settings::get().federation.enabled {
return Ok(());
}
let activity = activity.into_any_base()?;
let serialised_activity = serde_json::to_string(&activity)?;
for to_url in &to {
check_is_apub_id_valid(&to_url)?;
}
// TODO: it would make sense to create a separate task for each destination server
let message = SendActivityTask {
activity: serialised_activity,
to,
actor_id: actor.actor_id()?,
private_key: actor.private_key().context(location_info!())?,
};
activity_sender.queue::<SendActivityTask>(message)?;
Ok(())
}
#[derive(Clone, Debug, Deserialize, Serialize)]
struct SendActivityTask {
activity: String,
to: Vec<Url>,
actor_id: Url,
private_key: String,
}
impl ActixJob for SendActivityTask {
type State = MyState;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
const NAME: &'static str = "SendActivityTask";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(2);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
for to_url in &self.to {
let request = state
.client
.post(to_url.as_str())
.header("Content-Type", "application/json");
// TODO: i believe we have to do the signing in here because it is only valid for a few seconds
let signed = sign(
request,
self.activity.clone(),
&self.actor_id,
self.private_key.to_owned(),
)
.await;
let signed = match signed {
Ok(s) => s,
Err(e) => {
warn!("{}", e);
// dont return an error because retrying would probably not fix the signing
return Ok(());
}
};
if let Err(e) = signed.send().await {
warn!("{}", e);
return Err(anyhow!(
"Failed to send activity {} to {}",
&self.activity,
to_url
));
}
}
Ok(())
})
}
}
pub fn create_activity_queue() -> QueueHandle {