Hexbear is the engine that powers Chapochat. It is a customization of the Lemmy project.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
4.4 KiB

2 years ago
2 years ago
2 years ago
  1. use crate::{
  2. websocket::{
  3. chat_server::ChatServer,
  4. messages::{Connect, Disconnect, StandardMessage, WSMessage},
  5. },
  6. LemmyContext,
  7. };
  8. use actix::prelude::*;
  9. use actix_web::*;
  10. use actix_web_actors::ws;
  11. use core::ops::Deref;
  12. use lemmy_utils::get_ip;
  13. use log::{debug, error, info};
  14. use std::time::{Duration, Instant};
  15. /// How often heartbeat pings are sent
  16. const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
  17. /// How long before lack of client response causes a timeout
  18. const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
  19. /// Entry point for our route
  20. pub async fn chat_route(
  21. req: HttpRequest,
  22. stream: web::Payload,
  23. context: web::Data<LemmyContext>,
  24. ) -> Result<HttpResponse, Error> {
  25. ws::start(
  26. WSSession {
  27. cs_addr: context.chat_server().to_owned(),
  28. id: 0,
  29. hb: Instant::now(),
  30. ip: get_ip(req.connection_info().deref()),
  31. },
  32. &req,
  33. stream,
  34. )
  35. }
  36. struct WSSession {
  37. cs_addr: Addr<ChatServer>,
  38. /// unique session id
  39. id: usize,
  40. ip: String,
  41. /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
  42. /// otherwise we drop connection.
  43. hb: Instant,
  44. }
  45. impl Actor for WSSession {
  46. type Context = ws::WebsocketContext<Self>;
  47. /// Method is called on actor start.
  48. /// We register ws session with ChatServer
  49. fn started(&mut self, ctx: &mut Self::Context) {
  50. // we'll start heartbeat process on session start.
  51. self.hb(ctx);
  52. // register self in chat server. `AsyncContext::wait` register
  53. // future within context, but context waits until this future resolves
  54. // before processing any other events.
  55. // across all routes within application
  56. let addr = ctx.address();
  57. self
  58. .cs_addr
  59. .send(Connect {
  60. addr: addr.recipient(),
  61. ip: self.ip.to_owned(),
  62. })
  63. .into_actor(self)
  64. .then(|res, act, ctx| {
  65. match res {
  66. Ok(res) => act.id = res,
  67. // something is wrong with chat server
  68. _ => ctx.stop(),
  69. }
  70. actix::fut::ready(())
  71. })
  72. .wait(ctx);
  73. }
  74. fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
  75. // notify chat server
  76. self.cs_addr.do_send(Disconnect {
  77. id: self.id,
  78. ip: self.ip.to_owned(),
  79. });
  80. Running::Stop
  81. }
  82. }
  83. /// Handle messages from chat server, we simply send it to peer websocket
  84. /// These are room messages, IE sent to others in the room
  85. impl Handler<WSMessage> for WSSession {
  86. type Result = ();
  87. fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
  88. ctx.text(msg.0);
  89. }
  90. }
  91. /// WebSocket message handler
  92. impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
  93. fn handle(&mut self, result: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
  94. let message = match result {
  95. Ok(m) => m,
  96. Err(e) => {
  97. error!("{}", e);
  98. return;
  99. }
  100. };
  101. match message {
  102. ws::Message::Ping(msg) => {
  103. self.hb = Instant::now();
  104. ctx.pong(&msg);
  105. }
  106. ws::Message::Pong(_) => {
  107. self.hb = Instant::now();
  108. }
  109. ws::Message::Text(text) => {
  110. let m = text.trim().to_owned();
  111. info!("Message received: {:?} from id: {}", &m, self.id);
  112. self
  113. .cs_addr
  114. .send(StandardMessage {
  115. id: self.id,
  116. msg: m,
  117. })
  118. .into_actor(self)
  119. .then(|res, _, ctx| {
  120. match res {
  121. Ok(Ok(res)) => ctx.text(res),
  122. Ok(Err(_)) => {}
  123. Err(e) => error!("{}", &e),
  124. }
  125. actix::fut::ready(())
  126. })
  127. .spawn(ctx);
  128. }
  129. ws::Message::Binary(_bin) => info!("Unexpected binary"),
  130. ws::Message::Close(_) => {
  131. ctx.stop();
  132. }
  133. _ => {}
  134. }
  135. }
  136. }
  137. impl WSSession {
  138. /// helper method that sends ping to client every second.
  139. ///
  140. /// also this method checks heartbeats from client
  141. fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
  142. ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
  143. // check client heartbeats
  144. if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
  145. // heartbeat timed out
  146. debug!("Websocket Client heartbeat failed, disconnecting!");
  147. // notify chat server
  148. act.cs_addr.do_send(Disconnect {
  149. id: act.id,
  150. ip: act.ip.to_owned(),
  151. });
  152. // stop actor
  153. ctx.stop();
  154. // don't try to send a ping
  155. return;
  156. }
  157. ctx.ping(b"");
  158. });
  159. }
  160. }