Browse Source

Federation async (#848)

* Asyncify more

* I guess these changed

* Clean PR a bit

* Convert more away from failure error

* config changes for testing federation

* It was DNS

So actix-web's client relies on TRust DNS Resolver to figure out
where to send data, but TRust DNS Resolver seems to not play nice
with docker, which expressed itself as not resolving the name to
an IP address _the first time_ when making a request. The fix was
literally to make the request again (which I limited to 3 times
total, and not exceeding the request timeout in total)

* Only retry for connecterror

Since TRust DNS Resolver was causing ConnectError::Timeout,
this change limits the retry to only this error, returning
immediately for any other error

* Use http sig norm 0.4.0-alpha for actix-web 3.0 support

* Blocking function, retry http requests

* cargo +nightly fmt

* Only create one pictrs dir

* Don't yarn build

* cargo +nightly fmt
feature/settings-cleanup
Riley 2 years ago
committed by GitHub
parent
commit
48a1c13a9a
  1. 2
      docker/dev/docker-compose.yml
  2. 14
      docker/federation-test/run-tests.sh
  3. 19
      docker/federation-test/servers.sh
  4. 10
      docker/federation-test/tests.sh
  5. 2
      docker/federation/Dockerfile
  6. 56
      docker/federation/docker-compose.yml
  7. 12
      docker/federation/nginx.conf
  8. 567
      server/Cargo.lock
  9. 15
      server/Cargo.toml
  10. 310
      server/src/api/comment.rs
  11. 387
      server/src/api/community.rs
  12. 20
      server/src/api/mod.rs
  13. 280
      server/src/api/post.rs
  14. 424
      server/src/api/site.rs
  15. 623
      server/src/api/user.rs
  16. 59
      server/src/apub/activities.rs
  17. 281
      server/src/apub/comment.rs
  18. 204
      server/src/apub/community.rs
  19. 81
      server/src/apub/community_inbox.rs
  20. 8
      server/src/apub/extensions/group_extensions.rs
  21. 87
      server/src/apub/extensions/signatures.rs
  22. 304
      server/src/apub/fetcher.rs
  23. 185
      server/src/apub/mod.rs
  24. 233
      server/src/apub/post.rs
  25. 117
      server/src/apub/private_message.rs
  26. 877
      server/src/apub/shared_inbox.rs
  27. 87
      server/src/apub/user.rs
  28. 208
      server/src/apub/user_inbox.rs
  29. 24
      server/src/db/activity.rs
  30. 15
      server/src/db/code_migrations.rs
  31. 2
      server/src/db/comment.rs
  32. 2
      server/src/db/community.rs
  33. 8
      server/src/db/password_reset_request.rs
  34. 2
      server/src/db/user.rs
  35. 118
      server/src/lib.rs
  36. 25
      server/src/main.rs
  37. 19
      server/src/rate_limit/mod.rs
  38. 5
      server/src/rate_limit/rate_limiter.rs
  39. 51
      server/src/request.rs
  40. 14
      server/src/routes/api.rs
  41. 84
      server/src/routes/feeds.rs
  42. 62
      server/src/routes/nodeinfo.rs
  43. 108
      server/src/routes/webfinger.rs
  44. 6
      server/src/settings.rs
  45. 1
      server/src/websocket/mod.rs
  46. 565
      server/src/websocket/server.rs

2
docker/dev/docker-compose.yml

@ -20,6 +20,8 @@ services:
postgres:
image: postgres:12-alpine
ports:
- "127.0.0.1:5432:5432"
environment:
- POSTGRES_USER=lemmy
- POSTGRES_PASSWORD=password

14
docker/federation-test/run-tests.sh

@ -5,17 +5,21 @@ pushd ../../server/
cargo build
popd
pushd ../../ui
yarn
popd
mkdir -p volumes/pictrs_{alpha,beta,gamma}
sudo chown -R 991:991 volumes/pictrs_{alpha,beta,gamma}
sudo docker build ../../ --file ../federation/Dockerfile --tag lemmy-federation:latest
for Item in alpha beta gamma ; do
sudo mkdir -p volumes/pictrs_$Item
sudo chown -R 991:991 volumes/pictrs_$Item
done
sudo mkdir -p volumes/pictrs_alpha
sudo chown -R 991:991 volumes/pictrs_alpha
sudo docker-compose --file ../federation/docker-compose.yml --project-directory . up -d
pushd ../../ui
yarn
echo "Waiting for Lemmy to start..."
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8540/api/v1/site')" != "200" ]]; do sleep 1; done
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8550/api/v1/site')" != "200" ]]; do sleep 1; done

19
docker/federation-test/servers.sh

@ -0,0 +1,19 @@
#!/bin/bash
set -e
sudo rm -rf volumes
pushd ../../server/
cargo build
popd
pushd ../../ui
yarn
popd
mkdir -p volumes/pictrs_{alpha,beta,gamma}
sudo chown -R 991:991 volumes/pictrs_{alpha,beta,gamma}
sudo docker build ../../ --file ../federation/Dockerfile --tag lemmy-federation:latest
sudo docker-compose --file ../federation/docker-compose.yml --project-directory . up

10
docker/federation-test/tests.sh

@ -0,0 +1,10 @@
#!/bin/bash
set -xe
pushd ../../ui
echo "Waiting for Lemmy to start..."
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8540/api/v1/site')" != "200" ]]; do sleep 1; done
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8550/api/v1/site')" != "200" ]]; do sleep 1; done
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8560/api/v1/site')" != "200" ]]; do sleep 1; done
yarn api-test || true
popd

2
docker/federation/Dockerfile

@ -3,7 +3,7 @@ FROM ekidd/rust-musl-builder:1.42.0-openssl11
USER root
RUN mkdir /app/dist/documentation/ -p \
&& addgroup --gid 1001 lemmy \
&& adduser --disabled-password --shell /bin/sh -u 1001 --ingroup lemmy lemmy
&& adduser --gecos "" --disabled-password --shell /bin/sh -u 1001 --ingroup lemmy lemmy
# Copy resources
COPY server/config/defaults.hjson /app/config/defaults.hjson

56
docker/federation/docker-compose.yml

@ -12,28 +12,33 @@ services:
- ../federation/nginx.conf:/etc/nginx/nginx.conf
restart: on-failure
depends_on:
- lemmy_alpha
- pictrs_alpha
- lemmy_beta
- pictrs_beta
- lemmy_gamma
- pictrs_gamma
- lemmy-alpha
- pictrs
- lemmy-beta
- lemmy-gamma
- iframely
lemmy_alpha:
pictrs:
restart: always
image: asonix/pictrs:v0.1.13-r0
user: 991:991
volumes:
- ./volumes/pictrs_alpha:/mnt
lemmy-alpha:
image: lemmy-federation:latest
environment:
- LEMMY_HOSTNAME=lemmy_alpha:8540
- LEMMY_HOSTNAME=lemmy-alpha:8540
- LEMMY_DATABASE_URL=postgres://lemmy:[email protected]_alpha:5432/lemmy
- LEMMY_JWT_SECRET=changeme
- LEMMY_FRONT_END_DIR=/app/dist
- LEMMY_FEDERATION__ENABLED=true
- LEMMY_FEDERATION__TLS_ENABLED=false
- LEMMY_FEDERATION__ALLOWED_INSTANCES=lemmy_beta,lemmy_gamma
- LEMMY_FEDERATION__ALLOWED_INSTANCES=lemmy-beta,lemmy-gamma
- LEMMY_PORT=8540
- LEMMY_SETUP__ADMIN_USERNAME=lemmy_alpha
- LEMMY_SETUP__ADMIN_PASSWORD=lemmy
- LEMMY_SETUP__SITE_NAME=lemmy_alpha
- LEMMY_SETUP__SITE_NAME=lemmy-alpha
- RUST_BACKTRACE=1
- RUST_LOG=debug
depends_on:
@ -46,26 +51,21 @@ services:
- POSTGRES_DB=lemmy
volumes:
- ./volumes/postgres_alpha:/var/lib/postgresql/data
pictrs_alpha:
image: asonix/pictrs:v0.1.13-r0
user: 991:991
volumes:
- ./volumes/pictrs_alpha:/mnt
lemmy_beta:
lemmy-beta:
image: lemmy-federation:latest
environment:
- LEMMY_HOSTNAME=lemmy_beta:8550
- LEMMY_HOSTNAME=lemmy-beta:8550
- LEMMY_DATABASE_URL=postgres://lemmy:[email protected]_beta:5432/lemmy
- LEMMY_JWT_SECRET=changeme
- LEMMY_FRONT_END_DIR=/app/dist
- LEMMY_FEDERATION__ENABLED=true
- LEMMY_FEDERATION__TLS_ENABLED=false
- LEMMY_FEDERATION__ALLOWED_INSTANCES=lemmy_alpha,lemmy_gamma
- LEMMY_FEDERATION__ALLOWED_INSTANCES=lemmy-alpha,lemmy-gamma
- LEMMY_PORT=8550
- LEMMY_SETUP__ADMIN_USERNAME=lemmy_beta
- LEMMY_SETUP__ADMIN_PASSWORD=lemmy
- LEMMY_SETUP__SITE_NAME=lemmy_beta
- LEMMY_SETUP__SITE_NAME=lemmy-beta
- RUST_BACKTRACE=1
- RUST_LOG=debug
depends_on:
@ -78,26 +78,21 @@ services:
- POSTGRES_DB=lemmy
volumes:
- ./volumes/postgres_beta:/var/lib/postgresql/data
pictrs_beta:
image: asonix/pictrs:v0.1.13-r0
user: 991:991
volumes:
- ./volumes/pictrs_beta:/mnt
lemmy_gamma:
lemmy-gamma:
image: lemmy-federation:latest
environment:
- LEMMY_HOSTNAME=lemmy_gamma:8560
- LEMMY_HOSTNAME=lemmy-gamma:8560
- LEMMY_DATABASE_URL=postgres://lemmy:[email protected]_gamma:5432/lemmy
- LEMMY_JWT_SECRET=changeme
- LEMMY_FRONT_END_DIR=/app/dist
- LEMMY_FEDERATION__ENABLED=true
- LEMMY_FEDERATION__TLS_ENABLED=false
- LEMMY_FEDERATION__ALLOWED_INSTANCES=lemmy_alpha,lemmy_beta
- LEMMY_FEDERATION__ALLOWED_INSTANCES=lemmy-alpha,lemmy-beta
- LEMMY_PORT=8560
- LEMMY_SETUP__ADMIN_USERNAME=lemmy_gamma
- LEMMY_SETUP__ADMIN_PASSWORD=lemmy
- LEMMY_SETUP__SITE_NAME=lemmy_gamma
- LEMMY_SETUP__SITE_NAME=lemmy-gamma
- RUST_BACKTRACE=1
- RUST_LOG=debug
depends_on:
@ -110,11 +105,6 @@ services:
- POSTGRES_DB=lemmy
volumes:
- ./volumes/postgres_gamma:/var/lib/postgresql/data
pictrs_gamma:
image: asonix/pictrs:v0.1.13-r0
user: 991:991
volumes:
- ./volumes/pictrs_gamma:/mnt
iframely:
image: dogbin/iframely:latest

12
docker/federation/nginx.conf

@ -12,7 +12,7 @@ http {
client_max_body_size 50M;
location / {
proxy_pass http://lemmy_alpha:8540;
proxy_pass http://lemmy-alpha:8540;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
@ -26,7 +26,7 @@ http {
# pict-rs images
location /pictrs {
location /pictrs/image {
proxy_pass http://pictrs_alpha:8080/image;
proxy_pass http://pictrs:8080/image;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
@ -52,7 +52,7 @@ http {
client_max_body_size 50M;
location / {
proxy_pass http://lemmy_beta:8550;
proxy_pass http://lemmy-beta:8550;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
@ -66,7 +66,7 @@ http {
# pict-rs images
location /pictrs {
location /pictrs/image {
proxy_pass http://pictrs_beta:8080/image;
proxy_pass http://pictrs:8080/image;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
@ -92,7 +92,7 @@ http {
client_max_body_size 50M;
location / {
proxy_pass http://lemmy_gamma:8560;
proxy_pass http://lemmy-gamma:8560;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
@ -106,7 +106,7 @@ http {
# pict-rs images
location /pictrs {
location /pictrs/image {
proxy_pass http://pictrs_gamma:8080/image;
proxy_pass http://pictrs:8080/image;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

567
server/Cargo.lock
File diff suppressed because it is too large
View File

15
server/Cargo.toml

@ -19,11 +19,12 @@ chrono = { version = "0.4.7", features = ["serde"] }
serde_json = { version = "1.0.52", features = ["preserve_order"]}
failure = "0.1.8"
serde = { version = "1.0.105", features = ["derive"] }
actix = "0.9.0"
actix-web = "2.0.0"
actix-files = "0.2.1"
actix-web-actors = "2.0.0"
actix = "0.10.0-alpha.2"
actix-web = { version = "3.0.0-alpha.3", features = ["rustls"] }
actix-files = "0.3.0-alpha.1"
actix-web-actors = "3.0.0-alpha.1"
actix-rt = "1.1.1"
awc = "2.0.0-alpha.2"
log = "0.4.0"
env_logger = "0.7.1"
rand = "0.7.3"
@ -34,19 +35,19 @@ regex = "1.3.5"
lazy_static = "1.3.0"
lettre = "0.9.3"
lettre_email = "0.9.4"
sha2 = "0.8.1"
rss = "1.9.0"
htmlescape = "0.3.1"
url = { version = "2.1.1", features = ["serde"] }
config = {version = "0.10.1", default-features = false, features = ["hjson"] }
percent-encoding = "2.1.0"
isahc = "0.9.2"
comrak = "0.7"
openssl = "0.10"
http = "0.2.1"
http-signature-normalization = "0.5.1"
http-signature-normalization-actix = { version = "0.4.0-alpha.0", default-features = false, features = ["sha-2"] }
base64 = "0.12.1"
tokio = "0.2.21"
futures = "0.3.5"
itertools = "0.9.0"
uuid = { version = "0.8", features = ["serde", "v4"] }
sha2 = "0.9"
async-trait = "0.1.36"

310
server/src/api/comment.rs

@ -1,6 +1,7 @@
use crate::{
api::{APIError, Oper, Perform},
apub::{ApubLikeableType, ApubObjectType},
blocking,
db::{
comment::*,
comment_view::*,
@ -27,13 +28,10 @@ use crate::{
UserOperation,
WebsocketInfo,
},
DbPool,
LemmyError,
MentionData,
};
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use failure::Error;
use log::error;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
@ -97,14 +95,15 @@ pub struct GetCommentsResponse {
comments: Vec<CommentView>,
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<CreateComment> {
type Response = CommentResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
) -> Result<CommentResponse, Error> {
) -> Result<CommentResponse, LemmyError> {
let data: &CreateComment = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -114,20 +113,6 @@ impl Perform for Oper<CreateComment> {
let user_id = claims.id;
let conn = pool.get()?;
// Check for a community ban
let post = Post::read(&conn, data.post_id)?;
if CommunityUserBanView::get(&conn, user_id, post.community_id).is_ok() {
return Err(APIError::err("community_ban").into());
}
// Check for a site ban
let user = User_::read(&conn, user_id)?;
if user.banned {
return Err(APIError::err("site_ban").into());
}
let content_slurs_removed = remove_slurs(&data.content.to_owned());
let comment_form = CommentForm {
@ -144,21 +129,48 @@ impl Perform for Oper<CreateComment> {
local: true,
};
let inserted_comment = match Comment::create(&conn, &comment_form) {
Ok(comment) => comment,
Err(_e) => return Err(APIError::err("couldnt_create_comment").into()),
};
// Check for a community ban
let post_id = data.post_id;
let post = blocking(pool, move |conn| Post::read(conn, post_id)).await??;
let community_id = post.community_id;
let is_banned =
move |conn: &'_ _| CommunityUserBanView::get(conn, user_id, community_id).is_ok();
if blocking(pool, is_banned).await? {
return Err(APIError::err("community_ban").into());
}
let updated_comment = match Comment::update_ap_id(&conn, inserted_comment.id) {
// Check for a site ban
let user = blocking(pool, move |conn| User_::read(&conn, user_id)).await??;
if user.banned {
return Err(APIError::err("site_ban").into());
}
let comment_form2 = comment_form.clone();
let inserted_comment =
match blocking(pool, move |conn| Comment::create(&conn, &comment_form2)).await? {
Ok(comment) => comment,
Err(_e) => return Err(APIError::err("couldnt_create_comment").into()),
};
let inserted_comment_id = inserted_comment.id;
let updated_comment: Comment = match blocking(pool, move |conn| {
Comment::update_ap_id(&conn, inserted_comment_id)
})
.await?
{
Ok(comment) => comment,
Err(_e) => return Err(APIError::err("couldnt_create_comment").into()),
};
updated_comment.send_create(&user, &conn)?;
updated_comment
.send_create(&user, &self.client, pool)
.await?;
// Scan the comment for user mentions, add those rows
let mentions = scrape_text_for_mentions(&comment_form.content);
let recipient_ids = send_local_notifs(&conn, &mentions, &updated_comment, &user, &post);
let recipient_ids =
send_local_notifs(mentions, updated_comment.clone(), user.clone(), post, pool).await?;
// You like your own comment by default
let like_form = CommentLikeForm {
@ -168,14 +180,17 @@ impl Perform for Oper<CreateComment> {
score: 1,
};
let _inserted_like = match CommentLike::like(&conn, &like_form) {
Ok(like) => like,
Err(_e) => return Err(APIError::err("couldnt_like_comment").into()),
};
let like = move |conn: &'_ _| CommentLike::like(&conn, &like_form);
if blocking(pool, like).await?.is_err() {
return Err(APIError::err("couldnt_like_comment").into());
}
updated_comment.send_like(&user, &conn)?;
updated_comment.send_like(&user, &self.client, pool).await?;
let comment_view = CommentView::read(&conn, inserted_comment.id, Some(user_id))?;
let comment_view = blocking(pool, move |conn| {
CommentView::read(&conn, inserted_comment.id, Some(user_id))
})
.await??;
let mut res = CommentResponse {
comment: comment_view,
@ -198,14 +213,15 @@ impl Perform for Oper<CreateComment> {
}
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<EditComment> {
type Response = CommentResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
) -> Result<CommentResponse, Error> {
) -> Result<CommentResponse, LemmyError> {
let data: &EditComment = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -215,30 +231,44 @@ impl Perform for Oper<EditComment> {
let user_id = claims.id;
let conn = pool.get()?;
let user = blocking(pool, move |conn| User_::read(&conn, user_id)).await??;
let user = User_::read(&conn, user_id)?;
let orig_comment = CommentView::read(&conn, data.edit_id, None)?;
let edit_id = data.edit_id;
let orig_comment =
blocking(pool, move |conn| CommentView::read(&conn, edit_id, None)).await??;
// You are allowed to mark the comment as read even if you're banned.
if data.read.is_none() {
// Verify its the creator or a mod, or an admin
let mut editors: Vec<i32> = vec![data.creator_id];
let community_id = orig_comment.community_id;
editors.append(
&mut blocking(pool, move |conn| {
Ok(
CommunityModeratorView::for_community(&conn, community_id)?
.into_iter()
.map(|m| m.user_id)
.collect(),
) as Result<_, LemmyError>
})
.await??,
);
editors.append(
&mut CommunityModeratorView::for_community(&conn, orig_comment.community_id)?
.into_iter()
.map(|m| m.user_id)
.collect(),
&mut blocking(pool, move |conn| {
Ok(UserView::admins(conn)?.into_iter().map(|a| a.id).collect()) as Result<_, LemmyError>
})
.await??,
);
editors.append(&mut UserView::admins(&conn)?.into_iter().map(|a| a.id).collect());
if !editors.contains(&user_id) {
return Err(APIError::err("no_comment_edit_allowed").into());
}
// Check for a community ban
if CommunityUserBanView::get(&conn, user_id, orig_comment.community_id).is_ok() {
let community_id = orig_comment.community_id;
let is_banned =
move |conn: &'_ _| CommunityUserBanView::get(conn, user_id, community_id).is_ok();
if blocking(pool, is_banned).await? {
return Err(APIError::err("community_ban").into());
}
@ -250,7 +280,8 @@ impl Perform for Oper<EditComment> {
let content_slurs_removed = remove_slurs(&data.content.to_owned());
let read_comment = Comment::read(&conn, data.edit_id)?;
let edit_id = data.edit_id;
let read_comment = blocking(pool, move |conn| Comment::read(conn, edit_id)).await??;
let comment_form = CommentForm {
content: content_slurs_removed,
@ -270,31 +301,48 @@ impl Perform for Oper<EditComment> {
local: read_comment.local,
};
let updated_comment = match Comment::update(&conn, data.edit_id, &comment_form) {
let edit_id = data.edit_id;
let comment_form2 = comment_form.clone();
let updated_comment = match blocking(pool, move |conn| {
Comment::update(conn, edit_id, &comment_form2)
})
.await?
{
Ok(comment) => comment,
Err(_e) => return Err(APIError::err("couldnt_update_comment").into()),
};
if let Some(deleted) = data.deleted.to_owned() {
if deleted {
updated_comment.send_delete(&user, &conn)?;
updated_comment
.send_delete(&user, &self.client, pool)
.await?;
} else {
updated_comment.send_undo_delete(&user, &conn)?;
updated_comment
.send_undo_delete(&user, &self.client, pool)
.await?;
}
} else if let Some(removed) = data.removed.to_owned() {
if removed {
updated_comment.send_remove(&user, &conn)?;
updated_comment
.send_remove(&user, &self.client, pool)
.await?;
} else {
updated_comment.send_undo_remove(&user, &conn)?;
updated_comment
.send_undo_remove(&user, &self.client, pool)
.await?;
}
} else {
updated_comment.send_update(&user, &conn)?;
updated_comment
.send_update(&user, &self.client, pool)
.await?;
}
let post = Post::read(&conn, data.post_id)?;
let post_id = data.post_id;
let post = blocking(pool, move |conn| Post::read(conn, post_id)).await??;
let mentions = scrape_text_for_mentions(&comment_form.content);
let recipient_ids = send_local_notifs(&conn, &mentions, &updated_comment, &user, &post);
let recipient_ids = send_local_notifs(mentions, updated_comment, user, post, pool).await?;
// Mod tables
if let Some(removed) = data.removed.to_owned() {
@ -304,10 +352,14 @@ impl Perform for Oper<EditComment> {
removed: Some(removed),
reason: data.reason.to_owned(),
};
ModRemoveComment::create(&conn, &form)?;
blocking(pool, move |conn| ModRemoveComment::create(conn, &form)).await??;
}
let comment_view = CommentView::read(&conn, data.edit_id, Some(user_id))?;
let edit_id = data.edit_id;
let comment_view = blocking(pool, move |conn| {
CommentView::read(conn, edit_id, Some(user_id))
})
.await??;
let mut res = CommentResponse {
comment: comment_view,
@ -330,14 +382,15 @@ impl Perform for Oper<EditComment> {
}
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<SaveComment> {
type Response = CommentResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
) -> Result<CommentResponse, Error> {
) -> Result<CommentResponse, LemmyError> {
let data: &SaveComment = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -352,21 +405,23 @@ impl Perform for Oper<SaveComment> {
user_id,
};
let conn = pool.get()?;
if data.save {
match CommentSaved::save(&conn, &comment_saved_form) {
Ok(comment) => comment,
Err(_e) => return Err(APIError::err("couldnt_save_comment").into()),
};
let save_comment = move |conn: &'_ _| CommentSaved::save(conn, &comment_saved_form);
if blocking(pool, save_comment).await?.is_err() {
return Err(APIError::err("couldnt_save_comment").into());
}
} else {
match CommentSaved::unsave(&conn, &comment_saved_form) {
Ok(comment) => comment,
Err(_e) => return Err(APIError::err("couldnt_save_comment").into()),
};
let unsave_comment = move |conn: &'_ _| CommentSaved::unsave(conn, &comment_saved_form);
if blocking(pool, unsave_comment).await?.is_err() {
return Err(APIError::err("couldnt_save_comment").into());
}
}
let comment_view = CommentView::read(&conn, data.comment_id, Some(user_id))?;
let comment_id = data.comment_id;
let comment_view = blocking(pool, move |conn| {
CommentView::read(conn, comment_id, Some(user_id))
})
.await??;
Ok(CommentResponse {
comment: comment_view,
@ -375,14 +430,15 @@ impl Perform for Oper<SaveComment> {
}
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<CreateCommentLike> {
type Response = CommentResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
) -> Result<CommentResponse, Error> {
) -> Result<CommentResponse, LemmyError> {
let data: &CreateCommentLike = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -394,36 +450,42 @@ impl Perform for Oper<CreateCommentLike> {
let mut recipient_ids = Vec::new();
let conn = pool.get()?;
// Don't do a downvote if site has downvotes disabled
if data.score == -1 {
let site = SiteView::read(&conn)?;
let site = blocking(pool, move |conn| SiteView::read(conn)).await??;
if !site.enable_downvotes {
return Err(APIError::err("downvotes_disabled").into());
}
}
// Check for a community ban
let post = Post::read(&conn, data.post_id)?;
if CommunityUserBanView::get(&conn, user_id, post.community_id).is_ok() {
let post_id = data.post_id;
let post = blocking(pool, move |conn| Post::read(conn, post_id)).await??;
let community_id = post.community_id;
let is_banned =
move |conn: &'_ _| CommunityUserBanView::get(conn, user_id, community_id).is_ok();
if blocking(pool, is_banned).await? {
return Err(APIError::err("community_ban").into());
}
// Check for a site ban
let user = User_::read(&conn, user_id)?;
let user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;
if user.banned {
return Err(APIError::err("site_ban").into());
}
let comment = Comment::read(&conn, data.comment_id)?;
let comment_id = data.comment_id;
let comment = blocking(pool, move |conn| Comment::read(conn, comment_id)).await??;
// Add to recipient ids
match comment.parent_id {
Some(parent_id) => {
let parent_comment = Comment::read(&conn, parent_id)?;
let parent_comment = blocking(pool, move |conn| Comment::read(conn, parent_id)).await??;
if parent_comment.creator_id != user_id {
let parent_user = User_::read(&conn, parent_comment.creator_id)?;
let parent_user = blocking(pool, move |conn| {
User_::read(conn, parent_comment.creator_id)
})
.await??;
recipient_ids.push(parent_user.id);
}
}
@ -440,27 +502,33 @@ impl Perform for Oper<CreateCommentLike> {
};
// Remove any likes first
CommentLike::remove(&conn, &like_form)?;
let like_form2 = like_form.clone();
blocking(pool, move |conn| CommentLike::remove(conn, &like_form2)).await??;
// Only add the like if the score isnt 0
let do_add = like_form.score != 0 && (like_form.score == 1 || like_form.score == -1);
if do_add {
let _inserted_like = match CommentLike::like(&conn, &like_form) {
Ok(like) => like,
Err(_e) => return Err(APIError::err("couldnt_like_comment").into()),
};
let like_form2 = like_form.clone();
let like = move |conn: &'_ _| CommentLike::like(conn, &like_form2);
if blocking(pool, like).await?.is_err() {
return Err(APIError::err("couldnt_like_comment").into());
}
if like_form.score == 1 {
comment.send_like(&user, &conn)?;
comment.send_like(&user, &self.client, pool).await?;
} else if like_form.score == -1 {
comment.send_dislike(&user, &conn)?;
comment.send_dislike(&user, &self.client, pool).await?;
}
} else {
comment.send_undo_like(&user, &conn)?;
comment.send_undo_like(&user, &self.client, pool).await?;
}
// Have to refetch the comment to get the current state
let liked_comment = CommentView::read(&conn, data.comment_id, Some(user_id))?;
let comment_id = data.comment_id;
let liked_comment = blocking(pool, move |conn| {
CommentView::read(conn, comment_id, Some(user_id))
})
.await??;
let mut res = CommentResponse {
comment: liked_comment,
@ -483,14 +551,15 @@ impl Perform for Oper<CreateCommentLike> {
}
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<GetComments> {
type Response = GetCommentsResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
) -> Result<GetCommentsResponse, Error> {
) -> Result<GetCommentsResponse, LemmyError> {
let data: &GetComments = &self.data;
let user_claims: Option<Claims> = match &data.auth {
@ -509,19 +578,23 @@ impl Perform for Oper<GetComments> {
let type_ = ListingType::from_str(&data.type_)?;
let sort = SortType::from_str(&data.sort)?;
let conn = pool.get()?;
let comments = match CommentQueryBuilder::create(&conn)
.listing_type(type_)
.sort(&sort)
.for_community_id(data.community_id)
.my_user_id(user_id)
.page(data.page)
.limit(data.limit)
.list()
{
let community_id = data.community_id;
let page = data.page;
let limit = data.limit;
let comments = blocking(pool, move |conn| {
CommentQueryBuilder::create(conn)
.listing_type(type_)
.sort(&sort)
.for_community_id(community_id)
.my_user_id(user_id)
.page(page)
.limit(limit)
.list()
})
.await?;
let comments = match comments {
Ok(comments) => comments,
Err(_e) => return Err(APIError::err("couldnt_get_comments").into()),
Err(_) => return Err(APIError::err("couldnt_get_comments").into()),
};
if let Some(ws) = websocket_info {
@ -542,8 +615,23 @@ impl Perform for Oper<GetComments> {
}
}
pub fn send_local_notifs(
conn: &PgConnection,
pub async fn send_local_notifs(
mentions: Vec<MentionData>,
comment: Comment,
user: User_,
post: Post,
pool: &DbPool,
) -> Result<Vec<i32>, LemmyError> {
let ids = blocking(pool, move |conn| {
do_send_local_notifs(conn, &mentions, &comment, &user, &post)
})
.await?;
Ok(ids)
}
fn do_send_local_notifs(
conn: &diesel::PgConnection,
mentions: &[MentionData],
comment: &Comment,
user: &User_,

387
server/src/api/community.rs

@ -7,6 +7,7 @@ use crate::{
ActorType,
EndpointType,
},
blocking,
db::{Bannable, Crud, Followable, Joinable, SortType},
is_valid_community_name,
naive_from_unix,
@ -18,12 +19,9 @@ use crate::{
UserOperation,
WebsocketInfo,
},
DbPool,
LemmyError,
};
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use failure::Error;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
@ -138,14 +136,15 @@ pub struct TransferCommunity {
auth: String,
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<GetCommunity> {
type Response = GetCommunityResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
) -> Result<GetCommunityResponse, Error> {
) -> Result<GetCommunityResponse, LemmyError> {
let data: &GetCommunity = &self.data;
let user_id: Option<i32> = match &data.auth {
@ -159,33 +158,38 @@ impl Perform for Oper<GetCommunity> {
None => None,
};
let conn = pool.get()?;
let name = data.name.to_owned().unwrap_or_else(|| "main".to_string());
let community = match data.id {
Some(id) => Community::read(&conn, id)?,
None => {
match Community::read_from_name(
&conn,
&data.name.to_owned().unwrap_or_else(|| "main".to_string()),
) {
Ok(community) => community,
Err(_e) => return Err(APIError::err("couldnt_find_community").into()),
}
}
Some(id) => blocking(pool, move |conn| Community::read(conn, id)).await??,
None => match blocking(pool, move |conn| Community::read_from_name(conn, &name)).await? {
Ok(community) => community,
Err(_e) => return Err(APIError::err("couldnt_find_community").into()),
},
};
let community_view = match CommunityView::read(&conn, community.id, user_id) {
let community_id = community.id;
let community_view = match blocking(pool, move |conn| {
CommunityView::read(conn, community_id, user_id)
})
.await?
{
Ok(community) => community,
Err(_e) => return Err(APIError::err("couldnt_find_community").into()),
};
let moderators = match CommunityModeratorView::for_community(&conn, community.id) {
let community_id = community.id;
let moderators: Vec<CommunityModeratorView> = match blocking(pool, move |conn| {
CommunityModeratorView::for_community(conn, community_id)
})
.await?
{
Ok(moderators) => moderators,
Err(_e) => return Err(APIError::err("couldnt_find_community").into()),
};
let site_creator_id = Site::read(&conn, 1)?.creator_id;
let mut admins = UserView::admins(&conn)?;
let site = blocking(pool, move |conn| Site::read(conn, 1)).await??;
let site_creator_id = site.creator_id;
let mut admins = blocking(pool, move |conn| UserView::admins(conn)).await??;
let creator_index = admins.iter().position(|r| r.id == site_creator_id).unwrap();
let creator_user = admins.remove(creator_index);
admins.insert(0, creator_user);
@ -220,14 +224,15 @@ impl Perform for Oper<GetCommunity> {
}
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<CreateCommunity> {
type Response = CommunityResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
) -> Result<CommunityResponse, Error> {
) -> Result<CommunityResponse, LemmyError> {
let data: &CreateCommunity = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -255,10 +260,9 @@ impl Perform for Oper<CreateCommunity> {
let user_id = claims.id;
let conn = pool.get()?;
// Check for a site ban
if UserView::read(&conn, user_id)?.banned {
let user_view = blocking(pool, move |conn| UserView::read(conn, user_id)).await??;
if user_view.banned {
return Err(APIError::err("site_ban").into());
}
@ -283,34 +287,36 @@ impl Perform for Oper<CreateCommunity> {
published: None,
};
let inserted_community = match Community::create(&conn, &community_form) {
Ok(community) => community,
Err(_e) => return Err(APIError::err("community_already_exists").into()),
};
let inserted_community =
match blocking(pool, move |conn| Community::create(conn, &community_form)).await? {
Ok(community) => community,
Err(_e) => return Err(APIError::err("community_already_exists").into()),
};
let community_moderator_form = CommunityModeratorForm {
community_id: inserted_community.id,
user_id,
};
let _inserted_community_moderator =
match CommunityModerator::join(&conn, &community_moderator_form) {
Ok(user) => user,
Err(_e) => return Err(APIError::err("community_moderator_already_exists").into()),
};
let join = move |conn: &'_ _| CommunityModerator::join(conn, &community_moderator_form);
if blocking(pool, join).await?.is_err() {
return Err(APIError::err("community_moderator_already_exists").into());
}
let community_follower_form = CommunityFollowerForm {
community_id: inserted_community.id,
user_id,
};
let _inserted_community_follower =
match CommunityFollower::follow(&conn, &community_follower_form) {
Ok(user) => user,
Err(_e) => return Err(APIError::err("community_follower_already_exists").into()),
};
let follow = move |conn: &'_ _| CommunityFollower::follow(conn, &community_follower_form);
if blocking(pool, follow).await?.is_err() {
return Err(APIError::err("community_follower_already_exists").into());
}
let community_view = CommunityView::read(&conn, inserted_community.id, Some(user_id))?;
let community_view = blocking(pool, move |conn| {
CommunityView::read(conn, inserted_community.id, Some(user_id))
})
.await??;
Ok(CommunityResponse {
community: community_view,
@ -318,14 +324,15 @@ impl Perform for Oper<CreateCommunity> {
}
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<EditCommunity> {
type Response = CommunityResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
websocket_info: Option<WebsocketInfo>,
) -> Result<CommunityResponse, Error> {
) -> Result<CommunityResponse, LemmyError> {
let data: &EditCommunity = &self.data;
if let Err(slurs) = slur_check(&data.name) {
@ -353,28 +360,34 @@ impl Perform for Oper<EditCommunity> {
let user_id = claims.id;
let conn = pool.get()?;
// Check for a site ban
let user = User_::read(&conn, user_id)?;
let user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;
if user.banned {
return Err(APIError::err("site_ban").into());
}
// Verify its a mod
let edit_id = data.edit_id;
let mut editors: Vec<i32> = Vec::new();
editors.append(
&mut CommunityModeratorView::for_community(&conn, data.edit_id)?
.into_iter()
.map(|m| m.user_id)
.collect(),
&mut blocking(pool, move |conn| {
CommunityModeratorView::for_community(conn, edit_id)
.map(|v| v.into_iter().map(|m| m.user_id).collect())
})
.await??,
);
editors.append(
&mut blocking(pool, move |conn| {
UserView::admins(conn).map(|v| v.into_iter().map(|a| a.id).collect())
})
.await??,
);
editors.append(&mut UserView::admins(&conn)?.into_iter().map(|a| a.id).collect());
if !editors.contains(&user_id) {
return Err(APIError::err("no_community_edit_allowed").into());
}
let read_community = Community::read(&conn, data.edit_id)?;
let edit_id = data.edit_id;
let read_community = blocking(pool, move |conn| Community::read(conn, edit_id)).await??;
let community_form = CommunityForm {
name: data.name.to_owned(),
@ -394,7 +407,12 @@ impl Perform for Oper<EditCommunity> {
published: None,
};
let updated_community = match Community::update(&conn, data.edit_id, &community_form) {
let edit_id = data.edit_id;
let updated_community = match blocking(pool, move |conn| {
Community::update(conn, edit_id, &community_form)
})
.await?
{
Ok(community) => community,
Err(_e) => return Err(APIError::err("couldnt_update_community").into()),
};
@ -412,24 +430,36 @@ impl Perform for Oper<EditCommunity> {
reason: data.reason.to_owned(),
expires,
};
ModRemoveCommunity::create(&conn, &form)?;
blocking(pool, move |conn| ModRemoveCommunity::create(conn, &form)).await??;
}
if let Some(deleted) = data.deleted.to_owned() {
if deleted {
updated_community.send_delete(&user, &conn)?;
updated_community
.send_delete(&user, &self.client, pool)
.await?;
} else {
updated_community.send_undo_delete(&user, &conn)?;
updated_community
.send_undo_delete(&user, &self.client, pool)
.await?;
}
} else if let Some(removed) = data.removed.to_owned() {
if removed {
updated_community.send_remove(&user, &conn)?;
updated_community
.send_remove(&user, &self.client, pool)
.await?;
} else {
updated_community.send_undo_remove(&user, &conn)?;
updated_community
.send_undo_remove(&user, &self.client, pool)
.await?;
}
}
let community_view = CommunityView::read(&conn, data.edit_id, Some(user_id))?;
let edit_id = data.edit_id;
let community_view = blocking(pool, move |conn| {
CommunityView::read(conn, edit_id, Some(user_id))
})
.await??;
let res = CommunityResponse {
community: community_view,
@ -453,14 +483,15 @@ impl Perform for Oper<EditCommunity> {
}
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<ListCommunities> {
type Response = ListCommunitiesResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
) -> Result<ListCommunitiesResponse, Error> {
) -> Result<ListCommunitiesResponse, LemmyError> {
let data: &ListCommunities = &self.data;
let user_claims: Option<Claims> = match &data.auth {
@ -483,29 +514,33 @@ impl Perform for Oper<ListCommunities> {
let sort = SortType::from_str(&data.sort)?;
let conn = pool.get()?;
let communities = CommunityQueryBuilder::create(&conn)
.sort(&sort)
.for_user(user_id)
.show_nsfw(show_nsfw)
.page(data.page)
.limit(data.limit)
.list()?;
let page = data.page;
let limit = data.limit;
let communities = blocking(pool, move |conn| {
CommunityQueryBuilder::create(conn)
.sort(&sort)
.for_user(user_id)
.show_nsfw(show_nsfw)
.page(page)
.limit(limit)
.list()
})
.await??;
// Return the jwt
Ok(ListCommunitiesResponse { communities })
}
}
#[async_trait::async_trait(?Send)]
impl Perform for Oper<FollowCommunity> {
type Response = CommunityResponse;
fn perform(
async fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
pool: &DbPool,
_websocket_info: Option<WebsocketInfo>,
) -> Result<CommunityResponse, Error> {
) -> Result<CommunityResponse, LemmyError> {
let data: &FollowCommunity = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -515,9 +550,8 @@ impl Perform for Oper<FollowCommunity> {
let user_id = claims.id;
let conn = pool.get()?;
let community = Community::read(&conn, data.community_id)?;
let community_id = data.community_id;
let community = blocking(pool, move |conn| Community::read(conn, community_id)).await??;
let community_follower_form = CommunityFollowerForm {
community_id: data.community_id,
user_id,
@ -525,34 +559,44 @@ impl Perform for Oper<FollowCommunity> {
if community.local {
if data.follow {
match CommunityFollower::follow(&conn, &community_follower_form) {
Ok(user) => user,
Err(_e) => return Err(APIError::err("community_follower_already_exists").into()),
};
let follow = move |conn: &'_ _| CommunityFollower::follow(conn, &community_follower_form);
if blocking(pool, follow).await?.is_err() {
return Err(APIError::err("community_follower_already_exists").into());
}
} else {
match CommunityFollower::unfollow(&conn, &community_follower_form) {
Ok(user) => user,
Err(_e) => return Err(APIError::err("community_follower_already_exists").into()),
};
let unfollow =
move |conn: &'_ _| CommunityFollower::unfollow(conn, &community_follower_form);
if blocking(pool, unfollow).await?.is_err() {
return Err(APIError::err("community_follower_already_exists").into());
}
}
} else {
let user = User_::read(&conn, user_id)?;
let user = blocking(pool, move |conn| User_::read(conn, user_id)).await??;
if data.follow {
// Dont actually add to the community followers here, because you need
// to wait for the accept
user.send_follow(&community.actor_id, &conn)?;
user
.send_follow(&community.actor_id, &self.client, pool)
.await?;
} else {
user.send_unfollow(&community.actor_id, &conn)?;
match CommunityFollower::unfollow(&conn, &community_follower_form) {
Ok(user) => user,
Err(_e) => return Err(APIError::err("community_follower_already_exists").into()),
};
user
.send_unfollow(&community.actor_id, &self.client, pool)
.await?;
let unfollow =
move |conn: &'_ _| CommunityFollower::unfollow(conn, &community_follower_form);
if blocking(pool, unfollow).await?.is_err() {
return Err(APIError::err("community_follower_already_exists").into());
}
}
// TODO: this needs to return a "pending" state, until Accept is received from the remote server
}
let community_view = CommunityView::read(&conn, data.community_id, Some(user_id))?;
let community_id = data.community_id;