Browse Source

Adding websocket notification system.

- HTTP and APUB clients can now send live updating messages to websocket
  clients
- Rate limiting now affects both HTTP and websockets
- Rate limiting / Websocket logic is now moved into the API Perform
  functions.
- TODO This broke getting current online users, but that will have to
  wait for the perform trait to be made async.
- Fixes #446
feature/settings-cleanup
Dessalines 3 years ago
parent
commit
4755369207
  1. 776
      server/Cargo.lock
  2. 153
      server/src/api/comment.rs
  3. 231
      server/src/api/community.rs
  4. 25
      server/src/api/mod.rs
  5. 176
      server/src/api/post.rs
  6. 187
      server/src/api/site.rs
  7. 325
      server/src/api/user.rs
  8. 22
      server/src/lib.rs
  9. 20
      server/src/main.rs
  10. 18
      server/src/rate_limit/mod.rs
  11. 131
      server/src/rate_limit/rate_limiter.rs
  12. 53
      server/src/routes/api.rs
  13. 2
      server/src/routes/federation.rs
  14. 10
      server/src/routes/feeds.rs
  15. 4
      server/src/routes/index.rs
  16. 29
      server/src/routes/mod.rs
  17. 10
      server/src/routes/nodeinfo.rs
  18. 10
      server/src/routes/webfinger.rs
  19. 18
      server/src/routes/websocket.rs
  20. 23
      server/src/websocket/mod.rs
  21. 1101
      server/src/websocket/server.rs

776
server/Cargo.lock
File diff suppressed because it is too large
View File

153
server/src/api/comment.rs

@ -1,9 +1,4 @@
use super::*;
use crate::send_email;
use crate::settings::Settings;
use diesel::PgConnection;
use log::error;
use std::str::FromStr;
#[derive(Serialize, Deserialize)]
pub struct CreateComment {
@ -65,7 +60,12 @@ pub struct GetCommentsResponse {
}
impl Perform<CommentResponse> for Oper<CreateComment> {
fn perform(&self, conn: &PgConnection) -> Result<CommentResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<CommentResponse, Error> {
let data: &CreateComment = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -77,6 +77,15 @@ impl Perform<CommentResponse> for Oper<CreateComment> {
let hostname = &format!("https://{}", Settings::get().hostname);
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
// Check for a community ban
let post = Post::read(&conn, data.post_id)?;
if CommunityUserBanView::get(&conn, user_id, post.community_id).is_ok() {
@ -223,15 +232,34 @@ impl Perform<CommentResponse> for Oper<CreateComment> {
let comment_view = CommentView::read(&conn, inserted_comment.id, Some(user_id))?;
Ok(CommentResponse {
let mut res = CommentResponse {
comment: comment_view,
recipient_ids,
})
};
if let Some(ws) = websocket_info {
ws.chatserver.do_send(SendComment {
op: UserOperation::CreateComment,
comment: res.clone(),
my_id: ws.id,
});
// strip out the recipient_ids, so that
// users don't get double notifs
res.recipient_ids = Vec::new();
}
Ok(res)
}
}
impl Perform<CommentResponse> for Oper<EditComment> {
fn perform(&self, conn: &PgConnection) -> Result<CommentResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<CommentResponse, Error> {
let data: &EditComment = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -241,6 +269,15 @@ impl Perform<CommentResponse> for Oper<EditComment> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let orig_comment = CommentView::read(&conn, data.edit_id, None)?;
// You are allowed to mark the comment as read even if you're banned.
@ -353,15 +390,34 @@ impl Perform<CommentResponse> for Oper<EditComment> {
let comment_view = CommentView::read(&conn, data.edit_id, Some(user_id))?;
Ok(CommentResponse {
let mut res = CommentResponse {
comment: comment_view,
recipient_ids,
})
};
if let Some(ws) = websocket_info {
ws.chatserver.do_send(SendComment {
op: UserOperation::EditComment,
comment: res.clone(),
my_id: ws.id,
});
// strip out the recipient_ids, so that
// users don't get double notifs
res.recipient_ids = Vec::new();
}
Ok(res)
}
}
impl Perform<CommentResponse> for Oper<SaveComment> {
fn perform(&self, conn: &PgConnection) -> Result<CommentResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<CommentResponse, Error> {
let data: &SaveComment = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -376,6 +432,15 @@ impl Perform<CommentResponse> for Oper<SaveComment> {
user_id,
};
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
if data.save {
match CommentSaved::save(&conn, &comment_saved_form) {
Ok(comment) => comment,
@ -398,7 +463,12 @@ impl Perform<CommentResponse> for Oper<SaveComment> {
}
impl Perform<CommentResponse> for Oper<CreateCommentLike> {
fn perform(&self, conn: &PgConnection) -> Result<CommentResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<CommentResponse, Error> {
let data: &CreateCommentLike = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -410,6 +480,15 @@ impl Perform<CommentResponse> for Oper<CreateCommentLike> {
let mut recipient_ids = Vec::new();
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
// Don't do a downvote if site has downvotes disabled
if data.score == -1 {
let site = SiteView::read(&conn)?;
@ -467,15 +546,34 @@ impl Perform<CommentResponse> for Oper<CreateCommentLike> {
// Have to refetch the comment to get the current state
let liked_comment = CommentView::read(&conn, data.comment_id, Some(user_id))?;
Ok(CommentResponse {
let mut res = CommentResponse {
comment: liked_comment,
recipient_ids,
})
};
if let Some(ws) = websocket_info {
ws.chatserver.do_send(SendComment {
op: UserOperation::CreateCommentLike,
comment: res.clone(),
my_id: ws.id,
});
// strip out the recipient_ids, so that
// users don't get double notifs
res.recipient_ids = Vec::new();
}
Ok(res)
}
}
impl Perform<GetCommentsResponse> for Oper<GetComments> {
fn perform(&self, conn: &PgConnection) -> Result<GetCommentsResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetCommentsResponse, Error> {
let data: &GetComments = &self.data;
let user_claims: Option<Claims> = match &data.auth {
@ -494,6 +592,15 @@ impl Perform<GetCommentsResponse> for Oper<GetComments> {
let type_ = ListingType::from_str(&data.type_)?;
let sort = SortType::from_str(&data.sort)?;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let comments = match CommentQueryBuilder::create(&conn)
.listing_type(type_)
.sort(&sort)
@ -507,6 +614,20 @@ impl Perform<GetCommentsResponse> for Oper<GetComments> {
Err(_e) => return Err(APIError::err("couldnt_get_comments").into()),
};
if let Some(ws) = websocket_info {
// You don't need to join the specific community room, bc this is already handled by
// GetCommunity
if data.community_id.is_none() {
if let Some(id) = ws.id {
// 0 is the "all" community
ws.chatserver.do_send(JoinCommunityRoom {
community_id: 0,
id,
});
}
}
}
Ok(GetCommentsResponse { comments })
}
}

231
server/src/api/community.rs

@ -1,6 +1,4 @@
use super::*;
use diesel::PgConnection;
use std::str::FromStr;
#[derive(Serialize, Deserialize)]
pub struct GetCommunity {
@ -55,7 +53,7 @@ pub struct BanFromCommunity {
auth: String,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct BanFromCommunityResponse {
user: UserView,
banned: bool,
@ -69,7 +67,7 @@ pub struct AddModToCommunity {
auth: String,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct AddModToCommunityResponse {
moderators: Vec<CommunityModeratorView>,
}
@ -114,7 +112,12 @@ pub struct TransferCommunity {
}
impl Perform<GetCommunityResponse> for Oper<GetCommunity> {
fn perform(&self, conn: &PgConnection) -> Result<GetCommunityResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetCommunityResponse, Error> {
let data: &GetCommunity = &self.data;
let user_id: Option<i32> = match &data.auth {
@ -128,6 +131,15 @@ impl Perform<GetCommunityResponse> for Oper<GetCommunity> {
None => None,
};
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let community_id = match data.id {
Some(id) => id,
None => {
@ -157,18 +169,41 @@ impl Perform<GetCommunityResponse> for Oper<GetCommunity> {
let creator_user = admins.remove(creator_index);
admins.insert(0, creator_user);
// Return the jwt
Ok(GetCommunityResponse {
let online = if let Some(ws) = websocket_info {
if let Some(id) = ws.id {
ws.chatserver
.do_send(JoinCommunityRoom { community_id, id });
}
// TODO
1
// let fut = async {
// ws.chatserver.send(GetCommunityUsersOnline {community_id}).await.unwrap()
// };
// Runtime::new().unwrap().block_on(fut)
} else {
0
};
let res = GetCommunityResponse {
community: community_view,
moderators,
admins,
online: 0,
})
online,
};
// Return the jwt
Ok(res)
}
}
impl Perform<CommunityResponse> for Oper<CreateCommunity> {
fn perform(&self, conn: &PgConnection) -> Result<CommunityResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<CommunityResponse, Error> {
let data: &CreateCommunity = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -192,6 +227,15 @@ impl Perform<CommunityResponse> for Oper<CreateCommunity> {
let user_id = claims.id;
if let Some(rl) = &rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_register(&rl.ip, true)?;
}
let conn = pool.get()?;
// Check for a site ban
if UserView::read(&conn, user_id)?.banned {
return Err(APIError::err("site_ban").into());
@ -239,6 +283,13 @@ impl Perform<CommunityResponse> for Oper<CreateCommunity> {
let community_view = CommunityView::read(&conn, inserted_community.id, Some(user_id))?;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_register(&rl.ip, false)?;
}
Ok(CommunityResponse {
community: community_view,
})
@ -246,7 +297,12 @@ impl Perform<CommunityResponse> for Oper<CreateCommunity> {
}
impl Perform<CommunityResponse> for Oper<EditCommunity> {
fn perform(&self, conn: &PgConnection) -> Result<CommunityResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<CommunityResponse, Error> {
let data: &EditCommunity = &self.data;
if let Err(slurs) = slur_check(&data.name) {
@ -270,6 +326,15 @@ impl Perform<CommunityResponse> for Oper<EditCommunity> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
// Check for a site ban
if UserView::read(&conn, user_id)?.banned {
return Err(APIError::err("site_ban").into());
@ -323,14 +388,35 @@ impl Perform<CommunityResponse> for Oper<EditCommunity> {
let community_view = CommunityView::read(&conn, data.edit_id, Some(user_id))?;
Ok(CommunityResponse {
let res = CommunityResponse {
community: community_view,
})
};
if let Some(ws) = websocket_info {
// Strip out the user id and subscribed when sending to others
let mut res_sent = res.clone();
res_sent.community.user_id = None;
res_sent.community.subscribed = None;
ws.chatserver.do_send(SendCommunityRoomMessage {
op: UserOperation::EditCommunity,
response: res_sent,
community_id: data.edit_id,
my_id: ws.id,
});
}
Ok(res)
}
}
impl Perform<ListCommunitiesResponse> for Oper<ListCommunities> {
fn perform(&self, conn: &PgConnection) -> Result<ListCommunitiesResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<ListCommunitiesResponse, Error> {
let data: &ListCommunities = &self.data;
let user_claims: Option<Claims> = match &data.auth {
@ -353,6 +439,15 @@ impl Perform<ListCommunitiesResponse> for Oper<ListCommunities> {
let sort = SortType::from_str(&data.sort)?;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let communities = CommunityQueryBuilder::create(&conn)
.sort(&sort)
.for_user(user_id)
@ -367,7 +462,12 @@ impl Perform<ListCommunitiesResponse> for Oper<ListCommunities> {
}
impl Perform<CommunityResponse> for Oper<FollowCommunity> {
fn perform(&self, conn: &PgConnection) -> Result<CommunityResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<CommunityResponse, Error> {
let data: &FollowCommunity = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -382,6 +482,15 @@ impl Perform<CommunityResponse> for Oper<FollowCommunity> {
user_id,
};
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
if data.follow {
match CommunityFollower::follow(&conn, &community_follower_form) {
Ok(user) => user,
@ -403,7 +512,12 @@ impl Perform<CommunityResponse> for Oper<FollowCommunity> {
}
impl Perform<GetFollowedCommunitiesResponse> for Oper<GetFollowedCommunities> {
fn perform(&self, conn: &PgConnection) -> Result<GetFollowedCommunitiesResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetFollowedCommunitiesResponse, Error> {
let data: &GetFollowedCommunities = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -413,6 +527,15 @@ impl Perform<GetFollowedCommunitiesResponse> for Oper<GetFollowedCommunities> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let communities: Vec<CommunityFollowerView> =
match CommunityFollowerView::for_user(&conn, user_id) {
Ok(communities) => communities,
@ -425,7 +548,12 @@ impl Perform<GetFollowedCommunitiesResponse> for Oper<GetFollowedCommunities> {
}
impl Perform<BanFromCommunityResponse> for Oper<BanFromCommunity> {
fn perform(&self, conn: &PgConnection) -> Result<BanFromCommunityResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<BanFromCommunityResponse, Error> {
let data: &BanFromCommunity = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -440,6 +568,15 @@ impl Perform<BanFromCommunityResponse> for Oper<BanFromCommunity> {
user_id: data.user_id,
};
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
if data.ban {
match CommunityUserBan::ban(&conn, &community_user_ban_form) {
Ok(user) => user,
@ -470,15 +607,31 @@ impl Perform<BanFromCommunityResponse> for Oper<BanFromCommunity> {
let user_view = UserView::read(&conn, data.user_id)?;
Ok(BanFromCommunityResponse {
let res = BanFromCommunityResponse {
user: user_view,
banned: data.ban,
})
};
if let Some(ws) = websocket_info {
ws.chatserver.do_send(SendCommunityRoomMessage {
op: UserOperation::BanFromCommunity,
response: res.clone(),
community_id: data.community_id,
my_id: ws.id,
});
}
Ok(res)
}
}
impl Perform<AddModToCommunityResponse> for Oper<AddModToCommunity> {
fn perform(&self, conn: &PgConnection) -> Result<AddModToCommunityResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<AddModToCommunityResponse, Error> {
let data: &AddModToCommunity = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -493,6 +646,15 @@ impl Perform<AddModToCommunityResponse> for Oper<AddModToCommunity> {
user_id: data.user_id,
};
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
if data.added {
match CommunityModerator::join(&conn, &community_moderator_form) {
Ok(user) => user,
@ -516,12 +678,28 @@ impl Perform<AddModToCommunityResponse> for Oper<AddModToCommunity> {
let moderators = CommunityModeratorView::for_community(&conn, data.community_id)?;
Ok(AddModToCommunityResponse { moderators })
let res = AddModToCommunityResponse { moderators };
if let Some(ws) = websocket_info {
ws.chatserver.do_send(SendCommunityRoomMessage {
op: UserOperation::AddModToCommunity,
response: res.clone(),
community_id: data.community_id,
my_id: ws.id,
});
}
Ok(res)
}
}
impl Perform<GetCommunityResponse> for Oper<TransferCommunity> {
fn perform(&self, conn: &PgConnection) -> Result<GetCommunityResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetCommunityResponse, Error> {
let data: &TransferCommunity = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -531,6 +709,15 @@ impl Perform<GetCommunityResponse> for Oper<TransferCommunity> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let read_community = Community::read(&conn, data.community_id)?;
let site_creator_id = Site::read(&conn, 1)?.creator_id;

25
server/src/api/mod.rs

@ -18,12 +18,26 @@ use crate::db::user_mention_view::*;
use crate::db::user_view::*;
use crate::db::*;
use crate::{
extract_usernames, fetch_iframely_and_pictshare_data, naive_from_unix, naive_now, remove_slurs,
slur_check, slurs_vec_to_str,
extract_usernames, fetch_iframely_and_pictshare_data, generate_random_string, naive_from_unix,
naive_now, remove_slurs, send_email, slur_check, slurs_vec_to_str,
};
use crate::rate_limit::RateLimitInfo;
use crate::settings::Settings;
use crate::websocket::UserOperation;
use crate::websocket::{
server::{
JoinCommunityRoom, JoinPostRoom, JoinUserRoom, SendAllMessage, SendComment,
SendCommunityRoomMessage, SendPost, SendUserRoomMessage,
},
WebsocketInfo,
};
use diesel::r2d2::{ConnectionManager, Pool};
use diesel::PgConnection;
use failure::Error;
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
pub mod comment;
pub mod community;
@ -56,7 +70,12 @@ impl<T> Oper<T> {
}
pub trait Perform<T> {
fn perform(&self, conn: &PgConnection) -> Result<T, Error>
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<T, Error>
where
T: Sized;
}

176
server/src/api/post.rs

@ -1,6 +1,4 @@
use super::*;
use diesel::PgConnection;
use std::str::FromStr;
#[derive(Serialize, Deserialize)]
pub struct CreatePost {
@ -80,7 +78,12 @@ pub struct SavePost {
}
impl Perform<PostResponse> for Oper<CreatePost> {
fn perform(&self, conn: &PgConnection) -> Result<PostResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<PostResponse, Error> {
let data: &CreatePost = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -100,6 +103,15 @@ impl Perform<PostResponse> for Oper<CreatePost> {
let user_id = claims.id;
if let Some(rl) = &rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_post(&rl.ip, true)?;
}
let conn = pool.get()?;
// Check for a community ban
if CommunityUserBanView::get(&conn, user_id, data.community_id).is_ok() {
return Err(APIError::err("community_ban").into());
@ -164,12 +176,34 @@ impl Perform<PostResponse> for Oper<CreatePost> {
Err(_e) => return Err(APIError::err("couldnt_find_post").into()),
};
Ok(PostResponse { post: post_view })
if let Some(rl) = &rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_post(&rl.ip, false)?;
}
let res = PostResponse { post: post_view };
if let Some(ws) = websocket_info {
ws.chatserver.do_send(SendPost {
op: UserOperation::CreatePost,
post: res.clone(),
my_id: ws.id,
});
}
Ok(res)
}
}
impl Perform<GetPostResponse> for Oper<GetPost> {
fn perform(&self, conn: &PgConnection) -> Result<GetPostResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetPostResponse, Error> {
let data: &GetPost = &self.data;
let user_id: Option<i32> = match &data.auth {
@ -183,6 +217,15 @@ impl Perform<GetPostResponse> for Oper<GetPost> {
None => None,
};
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let post_view = match PostView::read(&conn, data.id, user_id) {
Ok(post) => post,
Err(_e) => return Err(APIError::err("couldnt_find_post").into()),
@ -204,6 +247,24 @@ impl Perform<GetPostResponse> for Oper<GetPost> {
let creator_user = admins.remove(creator_index);
admins.insert(0, creator_user);
let online = if let Some(ws) = websocket_info {
if let Some(id) = ws.id {
ws.chatserver.do_send(JoinPostRoom {
post_id: data.id,
id,
});
}
// TODO
1
// let fut = async {
// ws.chatserver.send(GetPostUsersOnline {post_id: data.id}).await.unwrap()
// };
// Runtime::new().unwrap().block_on(fut)
} else {
0
};
// Return the jwt
Ok(GetPostResponse {
post: post_view,
@ -211,13 +272,18 @@ impl Perform<GetPostResponse> for Oper<GetPost> {
community,
moderators,
admins,
online: 0,
online,
})
}
}
impl Perform<GetPostsResponse> for Oper<GetPosts> {
fn perform(&self, conn: &PgConnection) -> Result<GetPostsResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetPostsResponse, Error> {
let data: &GetPosts = &self.data;
let user_claims: Option<Claims> = match &data.auth {
@ -241,6 +307,15 @@ impl Perform<GetPostsResponse> for Oper<GetPosts> {
let type_ = ListingType::from_str(&data.type_)?;
let sort = SortType::from_str(&data.sort)?;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let posts = match PostQueryBuilder::create(&conn)
.listing_type(type_)
.sort(&sort)
@ -255,12 +330,31 @@ impl Perform<GetPostsResponse> for Oper<GetPosts> {
Err(_e) => return Err(APIError::err("couldnt_get_posts").into()),
};
if let Some(ws) = websocket_info {
// You don't need to join the specific community room, bc this is already handled by
// GetCommunity
if data.community_id.is_none() {
if let Some(id) = ws.id {
// 0 is the "all" community
ws.chatserver.do_send(JoinCommunityRoom {
community_id: 0,
id,
});
}
}
}
Ok(GetPostsResponse { posts })
}
}
impl Perform<PostResponse> for Oper<CreatePostLike> {
fn perform(&self, conn: &PgConnection) -> Result<PostResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<PostResponse, Error> {
let data: &CreatePostLike = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -270,6 +364,15 @@ impl Perform<PostResponse> for Oper<CreatePostLike> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
// Don't do a downvote if site has downvotes disabled
if data.score == -1 {
let site = SiteView::read(&conn)?;
@ -312,13 +415,27 @@ impl Perform<PostResponse> for Oper<CreatePostLike> {
Err(_e) => return Err(APIError::err("couldnt_find_post").into()),
};
// just output the score
Ok(PostResponse { post: post_view })
let res = PostResponse { post: post_view };
if let Some(ws) = websocket_info {
ws.chatserver.do_send(SendPost {
op: UserOperation::CreatePostLike,
post: res.clone(),
my_id: ws.id,
});
}
Ok(res)
}
}
impl Perform<PostResponse> for Oper<EditPost> {
fn perform(&self, conn: &PgConnection) -> Result<PostResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<PostResponse, Error> {
let data: &EditPost = &self.data;
if let Err(slurs) = slur_check(&data.name) {
@ -338,6 +455,15 @@ impl Perform<PostResponse> for Oper<EditPost> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
// Verify its the creator or a mod or admin
let mut editors: Vec<i32> = vec![data.creator_id];
editors.append(
@ -427,12 +553,27 @@ impl Perform<PostResponse> for Oper<EditPost> {
let post_view = PostView::read(&conn, data.edit_id, Some(user_id))?;
Ok(PostResponse { post: post_view })
let res = PostResponse { post: post_view };
if let Some(ws) = websocket_info {
ws.chatserver.do_send(SendPost {
op: UserOperation::EditPost,
post: res.clone(),
my_id: ws.id,
});
}
Ok(res)
}
}
impl Perform<PostResponse> for Oper<SavePost> {
fn perform(&self, conn: &PgConnection) -> Result<PostResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<PostResponse, Error> {
let data: &SavePost = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -447,6 +588,15 @@ impl Perform<PostResponse> for Oper<SavePost> {
user_id,
};
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
if data.save {
match PostSaved::save(&conn, &post_saved_form) {
Ok(post) => post,

187
server/src/api/site.rs

@ -1,10 +1,5 @@
use super::user::Register;
use super::*;
use crate::api::user::Register;
use crate::api::{Oper, Perform};
use crate::settings::Settings;
use diesel::PgConnection;
use log::info;
use std::str::FromStr;
#[derive(Serialize, Deserialize)]
pub struct ListCategories {}
@ -78,7 +73,7 @@ pub struct EditSite {
#[derive(Serialize, Deserialize)]
pub struct GetSite {}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct SiteResponse {
site: SiteView,
}
@ -114,9 +109,23 @@ pub struct SaveSiteConfig {
}
impl Perform<ListCategoriesResponse> for Oper<ListCategories> {
fn perform(&self, conn: &PgConnection) -> Result<ListCategoriesResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<ListCategoriesResponse, Error> {
let _data: &ListCategories = &self.data;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let categories: Vec<Category> = Category::list_all(&conn)?;
// Return the jwt
@ -125,9 +134,23 @@ impl Perform<ListCategoriesResponse> for Oper<ListCategories> {
}
impl Perform<GetModlogResponse> for Oper<GetModlog> {
fn perform(&self, conn: &PgConnection) -> Result<GetModlogResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetModlogResponse, Error> {
let data: &GetModlog = &self.data;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let removed_posts = ModRemovePostView::list(
&conn,
data.community_id,
@ -198,7 +221,12 @@ impl Perform<GetModlogResponse> for Oper<GetModlog> {
}
impl Perform<SiteResponse> for Oper<CreateSite> {
fn perform(&self, conn: &PgConnection) -> Result<SiteResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<SiteResponse, Error> {
let data: &CreateSite = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -218,6 +246,15 @@ impl Perform<SiteResponse> for Oper<CreateSite> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
// Make sure user is an admin
if !UserView::read(&conn, user_id)?.admin {
return Err(APIError::err("not_an_admin").into());
@ -245,7 +282,12 @@ impl Perform<SiteResponse> for Oper<CreateSite> {
}
impl Perform<SiteResponse> for Oper<EditSite> {
fn perform(&self, conn: &PgConnection) -> Result<SiteResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<SiteResponse, Error> {
let data: &EditSite = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -265,6 +307,15 @@ impl Perform<SiteResponse> for Oper<EditSite> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
// Make sure user is an admin
if !UserView::read(&conn, user_id)?.admin {
return Err(APIError::err("not_an_admin").into());
@ -289,14 +340,39 @@ impl Perform<SiteResponse> for Oper<EditSite> {
let site_view = SiteView::read(&conn)?;
Ok(SiteResponse { site: site_view })
let res = SiteResponse { site: site_view };
if let Some(ws) = websocket_info {
ws.chatserver.do_send(SendAllMessage {
op: UserOperation::EditSite,
response: res.clone(),
my_id: ws.id,
});
}
Ok(res)
}
}
impl Perform<GetSiteResponse> for Oper<GetSite> {
fn perform(&self, conn: &PgConnection) -> Result<GetSiteResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetSiteResponse, Error> {
let _data: &GetSite = &self.data;
if let Some(rl) = &rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
// TODO refactor this a little
let site = Site::read(&conn, 1);
let site_view = if site.is_ok() {
Some(SiteView::read(&conn)?)
@ -309,7 +385,11 @@ impl Perform<GetSiteResponse> for Oper<GetSite> {
admin: true,
show_nsfw: true,
};
let login_response = Oper::new(register).perform(&conn)?;
let login_response = Oper::new(register).perform(
pool.clone(),
websocket_info.clone(),
rate_limit_info.clone(),
)?;
info!("Admin {} created", setup.admin_username);
let create_site = CreateSite {
@ -320,7 +400,7 @@ impl Perform<GetSiteResponse> for Oper<GetSite> {
enable_nsfw: false,
auth: login_response.jwt,
};
Oper::new(create_site).perform(&conn)?;
Oper::new(create_site).perform(pool, websocket_info.clone(), rate_limit_info)?;
info!("Site {} created", setup.site_name);
Some(SiteView::read(&conn)?)
} else {
@ -337,17 +417,33 @@ impl Perform<GetSiteResponse> for Oper<GetSite> {
let banned = UserView::banned(&conn)?;
let online = if let Some(_ws) = websocket_info {
// TODO
1
// let fut = async {
// ws.chatserver.send(GetUsersOnline).await.unwrap()
// };
// Runtime::new().unwrap().block_on(fut)
} else {
0
};
Ok(GetSiteResponse {
site: site_view,
admins,
banned,
online: 0,
online,
})
}
}
impl Perform<SearchResponse> for Oper<Search> {
fn perform(&self, conn: &PgConnection) -> Result<SearchResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<SearchResponse, Error> {
let data: &Search = &self.data;
let user_id: Option<i32> = match &data.auth {
@ -371,6 +467,15 @@ impl Perform<SearchResponse> for Oper<Search> {
// TODO no clean / non-nsfw searching rn
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
match type_ {
SearchType::Posts => {
posts = PostQueryBuilder::create(&conn)
@ -465,7 +570,12 @@ impl Perform<SearchResponse> for Oper<Search> {
}
impl Perform<GetSiteResponse> for Oper<TransferSite> {
fn perform(&self, conn: &PgConnection) -> Result<GetSiteResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetSiteResponse, Error> {
let data: &TransferSite = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -475,6 +585,15 @@ impl Perform<GetSiteResponse> for Oper<TransferSite> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
let read_site = Site::read(&conn, 1)?;
// Make sure user is the creator
@ -528,7 +647,12 @@ impl Perform<GetSiteResponse> for Oper<TransferSite> {
}
impl Perform<GetSiteConfigResponse> for Oper<GetSiteConfig> {
fn perform(&self, conn: &PgConnection) -> Result<GetSiteConfigResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetSiteConfigResponse, Error> {
let data: &GetSiteConfig = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -538,6 +662,15 @@ impl Perform<GetSiteConfigResponse> for Oper<GetSiteConfig> {
let user_id = claims.id;
if let Some(rl) = rate_limit_info {
rl.rate_limiter
.lock()
.unwrap()
.check_rate_limit_message(&rl.ip, false)?;
}
let conn = pool.get()?;
// Only let admins read this
let admins = UserView::admins(&conn)?;
let admin_ids: Vec<i32> = admins.into_iter().map(|m| m.id).collect();
@ -553,7 +686,12 @@ impl Perform<GetSiteConfigResponse> for Oper<GetSiteConfig> {
}
impl Perform<GetSiteConfigResponse> for Oper<SaveSiteConfig> {
fn perform(&self, conn: &PgConnection) -> Result<GetSiteConfigResponse, Error> {
fn perform(
&self,
pool: Pool<ConnectionManager<PgConnection>>,
_websocket_info: Option<WebsocketInfo>,
rate_limit_info: Option<RateLimitInfo>,
) -> Result<GetSiteConfigResponse, Error> {
let data: &SaveSiteConfig = &self.data;
let claims = match Claims::decode(&data.auth) {
@ -563,6 +701,15 @@ impl Perform<GetSiteConfigResponse> for Oper<SaveSiteConfig> {
let user_id = claims.id;