WebSocketModule.ts 7.8 KB


  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types, isObjectIdOrHexString } from "mongoose";
  6. import { forEachIn } from "@common/utils/forEachIn";
  7. import { getErrorMessage } from "@common/utils/getErrorMessage";
  8. import BaseModule from "@/BaseModule";
  9. import WebSocket from "@/WebSocket";
  10. import ModuleManager from "@/ModuleManager";
  11. import JobQueue from "@/JobQueue";
  12. import DataModule from "./DataModule";
  13. import { UserModel } from "./DataModule/models/users/schema";
  14. import { SessionModel } from "./DataModule/models/sessions/schema";
  15. import EventsModule from "./EventsModule";
  16. // import assertEventDerived from "@/utils/assertEventDerived";
  17. export class WebSocketModule extends BaseModule {
  18. private _httpServer?: Server;
  19. private _wsServer?: WebSocketServer;
  20. private _keepAliveInterval?: NodeJS.Timer;
  21. /**
  22. * WebSocket Module
  23. */
  24. public constructor() {
  25. super("websocket");
  26. this._dependentModules = ["data", "events"];
  27. }
  28. /**
  29. * startup - Startup websocket module
  30. */
  31. public override async startup() {
  32. await super.startup();
  33. this._httpServer = http
  34. .createServer(express())
  35. .listen(config.get("port"));
  36. this._wsServer = new WebSocketServer({
  37. server: this._httpServer,
  38. path: "/ws",
  39. WebSocket
  40. });
  41. this._wsServer.on(
  42. "connection",
  43. (socket: WebSocket, request: IncomingMessage) =>
  44. this._handleConnection(socket, request)
  45. );
  46. this._keepAliveInterval = setInterval(() => this._keepAlive(), 45000);
  47. this._wsServer.on("close", async () =>
  48. clearInterval(this._keepAliveInterval)
  49. );
  50. await EventsModule.pSubscribe("events.job.completed:*", async event => {
  51. // assertEventDerived(event);
  52. const data = event.getData();
  53. const { socketId, callbackRef } = data;
  54. if (!socketId || !callbackRef) return;
  55. delete data.socketId;
  56. delete data.callbackRef;
  57. this.dispatch(socketId, "jobCallback", callbackRef, data);
  58. });
  59. await super._started();
  60. }
  61. /**
  62. * keepAlive - Ping open clients and terminate closed
  63. */
  64. private async _keepAlive() {
  65. if (!this._wsServer) return;
  66. for await (const clients of this._wsServer.clients.entries()) {
  67. await forEachIn(clients, async socket => {
  68. switch (socket.readyState) {
  69. case socket.OPEN:
  70. socket.ping();
  71. break;
  72. case socket.CLOSED:
  73. socket.terminate();
  74. break;
  75. default:
  76. break;
  77. }
  78. });
  79. }
  80. }
  81. /**
  82. * handleConnection - Handle websocket connection
  83. */
  84. private async _handleConnection(
  85. socket: WebSocket,
  86. request: IncomingMessage
  87. ) {
  88. if (JobQueue.getStatus().isPaused) {
  89. socket.close();
  90. return;
  91. }
  92. socket.setSocketId(request.headers["sec-websocket-key"]);
  93. let sessionId;
  94. let user;
  95. if (request.headers.cookie) {
  96. sessionId = request.headers.cookie
  97. .split("; ")
  98. .find(
  99. cookie =>
  100. cookie.substring(0, cookie.indexOf("=")) ===
  101. config.get<string>("cookie")
  102. );
  103. sessionId = sessionId?.substring(
  104. sessionId.indexOf("=") + 1,
  105. sessionId.length
  106. );
  107. }
  108. if (sessionId && isObjectIdOrHexString(sessionId)) {
  109. socket.setSessionId(sessionId);
  110. const Session = await DataModule.getModel("sessions");
  111. const session = await Session.findByIdAndUpdate(sessionId, {
  112. updatedAt: Date.now()
  113. });
  114. if (session) {
  115. const User = await DataModule.getModel<UserModel>("users");
  116. user = await User.findById(session.userId);
  117. }
  118. }
  119. const readyData = {
  120. config: {
  121. cookie: config.get("cookie"),
  122. sitename: config.get("sitename"),
  123. recaptcha: {
  124. enabled: config.get("apis.recaptcha.enabled"),
  125. key: config.get("apis.recaptcha.key")
  126. },
  127. githubAuthentication: config.get("apis.github.enabled"),
  128. messages: config.get("messages"),
  129. christmas: config.get("christmas"),
  130. footerLinks: config.get("footerLinks"),
  131. shortcutOverrides: config.get("shortcutOverrides"),
  132. registrationDisabled: config.get("registrationDisabled"),
  133. mailEnabled: config.get("mail.enabled"),
  134. discogsEnabled: config.get("apis.discogs.enabled"),
  135. experimental: {
  136. changable_listen_mode: config.get(
  137. "experimental.changable_listen_mode"
  138. ),
  139. media_session: config.get("experimental.media_session"),
  140. disable_youtube_search: config.get(
  141. "experimental.disable_youtube_search"
  142. ),
  143. station_history: config.get("experimental.station_history"),
  144. soundcloud: config.get("experimental.soundcloud"),
  145. spotify: config.get("experimental.spotify")
  146. }
  147. },
  148. user
  149. };
  150. socket.log({
  151. type: "debug",
  152. message: `WebSocket opened #${socket.getSocketId()}`
  153. });
  154. socket.on("error", error =>
  155. socket.log({
  156. type: "error",
  157. message: error.message,
  158. data: { error }
  159. })
  160. );
  161. socket.on("close", async () => {
  162. const socketId = socket.getSocketId();
  163. const Job = EventsModule.getJob("unsubscribeAll");
  164. await JobQueue.runJob(Job, null, {
  165. socketId
  166. });
  167. socket.log({
  168. type: "debug",
  169. message: `WebSocket closed #${socketId}`
  170. });
  171. });
  172. socket.dispatch("ready", readyData);
  173. socket.on("message", message => this._handleMessage(socket, message));
  174. }
  175. /**
  176. * handleMessage - Handle websocket message
  177. */
  178. private async _handleMessage(socket: WebSocket, message: RawData) {
  179. if (JobQueue.getStatus().isPaused) {
  180. socket.close();
  181. return;
  182. }
  183. let callbackRef;
  184. try {
  185. const data = JSON.parse(message.toString());
  186. if (!Array.isArray(data) || data.length < 1)
  187. throw new Error("Invalid request");
  188. const [moduleJob, payload, options] = data;
  189. const moduleName = moduleJob.substring(0, moduleJob.indexOf("."));
  190. const jobName = moduleJob.substring(moduleJob.indexOf(".") + 1);
  191. const { callbackRef } = options ?? payload ?? {};
  192. if (!callbackRef)
  193. throw new Error(
  194. `No callback reference provided for job ${moduleJob}`
  195. );
  196. const module = ModuleManager.getModule(moduleName);
  197. if (!module) throw new Error(`Module "${moduleName}" not found`);
  198. const Job = module.getJob(jobName);
  199. if (!Job?.isApiEnabled())
  200. throw new Error(`Job "${jobName}" not found.`);
  201. let session;
  202. if (socket.getSessionId()) {
  203. const Session = await DataModule.getModel<SessionModel>(
  204. "sessions"
  205. );
  206. session = await Session.findByIdAndUpdate(
  207. socket.getSessionId(),
  208. {
  209. updatedAt: Date.now()
  210. }
  211. );
  212. if (!session) throw new Error("Session not found.");
  213. }
  214. await JobQueue.queueJob(Job, payload, {
  215. session,
  216. socketId: socket.getSocketId(),
  217. callbackRef
  218. });
  219. } catch (error) {
  220. const message = getErrorMessage(error);
  221. if (callbackRef)
  222. socket.dispatch("jobCallback", callbackRef, {
  223. status: "error",
  224. message
  225. });
  226. else socket.dispatch("error", message);
  227. }
  228. }
  229. /**
  230. * getSockets - Get websocket clients
  231. */
  232. public async getSockets() {
  233. return this._wsServer?.clients;
  234. }
  235. /**
  236. * getSocket - Get websocket client
  237. */
  238. public async getSocket(socketId?: string, sessionId?: Types.ObjectId) {
  239. if (!this._wsServer) return null;
  240. for (const clients of this._wsServer.clients.entries() as IterableIterator<
  241. [WebSocket, WebSocket]
  242. >) {
  243. const socket = clients.find(socket => {
  244. if (socket.getSocketId() === socketId) return true;
  245. if (socket.getSessionId() === sessionId) return true;
  246. return false;
  247. });
  248. if (socket) return socket;
  249. }
  250. return null;
  251. }
  252. /**
  253. * dispatch - Dispatch message to socket
  254. */
  255. public async dispatch(
  256. socketId: string,
  257. channel: string,
  258. ...values: unknown[]
  259. ) {
  260. const socket = await this.getSocket(socketId);
  261. if (!socket) return;
  262. socket.dispatch(channel, ...values);
  263. }
  264. /**
  265. * shutdown - Shutdown websocket module
  266. */
  267. public override async shutdown() {
  268. await super.shutdown();
  269. if (this._httpServer) this._httpServer.close();
  270. if (this._wsServer) this._wsServer.close();
  271. await this._stopped();
  272. }
  273. }
  274. export default new WebSocketModule();