WebSocketModule.ts 7.3 KB


  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types, isObjectIdOrHexString } from "mongoose";
  6. import BaseModule from "@/BaseModule";
  7. import { UniqueMethods } from "@/types/Modules";
  8. import WebSocket from "@/WebSocket";
  9. import ModuleManager from "@/ModuleManager";
  10. import JobQueue from "@/JobQueue";
  11. import DataModule from "./DataModule";
  12. export class WebSocketModule extends BaseModule {
  13. private _httpServer?: Server;
  14. private _wsServer?: WebSocketServer;
  15. private _keepAliveInterval?: NodeJS.Timer;
  16. /**
  17. * WebSocket Module
  18. */
  19. public constructor() {
  20. super("websocket");
  21. }
  22. /**
  23. * startup - Startup websocket module
  24. */
  25. public override async startup() {
  26. await super.startup();
  27. this._httpServer = http
  28. .createServer(express())
  29. .listen(config.get("port"));
  30. this._wsServer = new WebSocketServer({
  31. server: this._httpServer,
  32. path: "/ws",
  33. WebSocket
  34. });
  35. this._wsServer.on(
  36. "connection",
  37. (socket: WebSocket, request: IncomingMessage) =>
  38. this._handleConnection(socket, request)
  39. );
  40. this._keepAliveInterval = setInterval(() => this._keepAlive(), 45000);
  41. this._wsServer.on("close", async () =>
  42. clearInterval(this._keepAliveInterval)
  43. );
  44. await super._started();
  45. }
  46. /**
  47. * keepAlive - Ping open clients and terminate closed
  48. */
  49. private async _keepAlive() {
  50. if (!this._wsServer) return;
  51. for await (const clients of this._wsServer.clients.entries()) {
  52. await Promise.all(
  53. clients.map(async socket => {
  54. switch (socket.readyState) {
  55. case socket.OPEN:
  56. socket.ping();
  57. break;
  58. case socket.CLOSED:
  59. socket.terminate();
  60. break;
  61. default:
  62. break;
  63. }
  64. })
  65. );
  66. }
  67. }
  68. /**
  69. * handleConnection - Handle websocket connection
  70. */
  71. private async _handleConnection(
  72. socket: WebSocket,
  73. request: IncomingMessage
  74. ) {
  75. if (JobQueue.getStatus().isPaused) {
  76. socket.close();
  77. return;
  78. }
  79. socket.setSocketId(request.headers["sec-websocket-key"]);
  80. let sessionId;
  81. let user;
  82. if (request.headers.cookie) {
  83. sessionId = request.headers.cookie
  84. .split("; ")
  85. .find(
  86. cookie =>
  87. cookie.substring(0, cookie.indexOf("=")) ===
  88. config.get<string>("cookie")
  89. );
  90. sessionId = sessionId?.substring(
  91. sessionId.indexOf("=") + 1,
  92. sessionId.length
  93. );
  94. }
  95. if (sessionId && isObjectIdOrHexString(sessionId)) {
  96. socket.setSessionId(sessionId);
  97. const Session = await DataModule.getModel("sessions");
  98. const session = await Session.findByIdAndUpdate(sessionId, {
  99. updatedAt: Date.now()
  100. });
  101. if (session) {
  102. const User = await DataModule.getModel("users");
  103. user = await User.findById(session.userId);
  104. }
  105. }
  106. const readyData = {
  107. config: {
  108. cookie: config.get("cookie"),
  109. sitename: config.get("sitename"),
  110. recaptcha: {
  111. enabled: config.get("apis.recaptcha.enabled"),
  112. key: config.get("apis.recaptcha.key")
  113. },
  114. githubAuthentication: config.get("apis.github.enabled"),
  115. messages: config.get("messages"),
  116. christmas: config.get("christmas"),
  117. footerLinks: config.get("footerLinks"),
  118. shortcutOverrides: config.get("shortcutOverrides"),
  119. registrationDisabled: config.get("registrationDisabled"),
  120. mailEnabled: config.get("mail.enabled"),
  121. discogsEnabled: config.get("apis.discogs.enabled"),
  122. experimental: {
  123. changable_listen_mode: config.get(
  124. "experimental.changable_listen_mode"
  125. ),
  126. media_session: config.get("experimental.media_session"),
  127. disable_youtube_search: config.get(
  128. "experimental.disable_youtube_search"
  129. ),
  130. station_history: config.get("experimental.station_history"),
  131. soundcloud: config.get("experimental.soundcloud"),
  132. spotify: config.get("experimental.spotify")
  133. }
  134. },
  135. user
  136. };
  137. socket.log({
  138. type: "debug",
  139. message: `WebSocket opened #${socket.getSocketId()}`
  140. });
  141. socket.on("error", error =>
  142. socket.log({
  143. type: "error",
  144. message: error.message,
  145. data: { error }
  146. })
  147. );
  148. socket.on("close", async () => {
  149. await JobQueue.runJob(
  150. "events",
  151. "unsubscribeAll",
  152. {},
  153. {
  154. socketId: socket.getSocketId()
  155. }
  156. );
  157. socket.log({
  158. type: "debug",
  159. message: `WebSocket closed #${socket.getSocketId()}`
  160. });
  161. });
  162. socket.dispatch("ready", readyData);
  163. socket.on("message", message => this._handleMessage(socket, message));
  164. }
  165. /**
  166. * handleMessage - Handle websocket message
  167. */
  168. private async _handleMessage(socket: WebSocket, message: RawData) {
  169. if (JobQueue.getStatus().isPaused) {
  170. socket.close();
  171. return;
  172. }
  173. let callbackRef;
  174. try {
  175. const data = JSON.parse(message.toString());
  176. if (!Array.isArray(data) || data.length < 1)
  177. throw new Error("Invalid request");
  178. const [moduleJob, payload, options] = data;
  179. const [moduleName, ...jobNameParts] = moduleJob.split(".");
  180. const jobName = jobNameParts.join(".");
  181. const { callbackRef } = options ?? payload ?? {};
  182. if (!callbackRef)
  183. throw new Error(
  184. `No callback reference provided for job ${moduleJob}`
  185. );
  186. const module = ModuleManager.getModule(moduleName);
  187. if (!module) throw new Error(`Module "${moduleName}" not found`);
  188. const Job = module.getJob(jobName);
  189. if (!Job?.isApiEnabled())
  190. throw new Error(`Job "${jobName}" not found.`);
  191. let session;
  192. if (socket.getSessionId()) {
  193. const Session = await DataModule.getModel("sessions");
  194. session = await Session.findByIdAndUpdate(
  195. socket.getSessionId(),
  196. {
  197. updatedAt: Date.now()
  198. }
  199. );
  200. }
  201. await JobQueue.queueJob(
  202. moduleName,
  203. jobName,
  204. payload,
  205. {},
  206. {
  207. session,
  208. socketId: socket.getSocketId(),
  209. callbackRef
  210. }
  211. );
  212. } catch (error) {
  213. const message = error?.message ?? error;
  214. if (callbackRef)
  215. socket.dispatch("jobCallback", callbackRef, {
  216. status: "error",
  217. message
  218. });
  219. else socket.dispatch("error", message);
  220. }
  221. }
  222. /**
  223. * getSockets - Get websocket clients
  224. */
  225. public async getSockets() {
  226. return this._wsServer?.clients;
  227. }
  228. /**
  229. * getSocket - Get websocket client
  230. */
  231. public async getSocket(socketId?: string, sessionId?: Types.ObjectId) {
  232. if (!this._wsServer) return null;
  233. for (const clients of this._wsServer.clients.entries() as IterableIterator<
  234. [WebSocket, WebSocket]
  235. >) {
  236. const socket = clients.find(socket => {
  237. if (socket.getSocketId() === socketId) return true;
  238. if (socket.getSessionId() === sessionId) return true;
  239. return false;
  240. });
  241. if (socket) return socket;
  242. }
  243. return null;
  244. }
  245. /**
  246. * dispatch - Dispatch message to socket
  247. */
  248. public async dispatch(socketId: string, channel: string, ...values) {
  249. const socket = await this.getSocket(socketId);
  250. if (!socket) return;
  251. socket.dispatch(channel, ...values);
  252. }
  253. /**
  254. * shutdown - Shutdown websocket module
  255. */
  256. public override async shutdown() {
  257. await super.shutdown();
  258. if (this._httpServer) this._httpServer.close();
  259. if (this._wsServer) this._wsServer.close();
  260. await this._stopped();
  261. }
  262. }
  263. export type WebSocketModuleJobs = {
  264. [Property in keyof UniqueMethods<WebSocketModule>]: {
  265. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  266. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  267. };
  268. };
  269. export default new WebSocketModule();