WebSocketModule.ts 7.4 KB


  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types, isObjectIdOrHexString } from "mongoose";
  6. import BaseModule from "@/BaseModule";
  7. import { UniqueMethods } from "@/types/Modules";
  8. import WebSocket from "@/WebSocket";
  9. import JobContext from "@/JobContext";
  10. import Job from "@/Job";
  11. import ModuleManager from "@/ModuleManager";
  12. import JobQueue from "@/JobQueue";
  13. import DataModule from "./DataModule";
  14. export class WebSocketModule extends BaseModule {
  15. private _httpServer?: Server;
  16. private _wsServer?: WebSocketServer;
  17. private _keepAliveInterval?: NodeJS.Timer;
  18. /**
  19. * WebSocket Module
  20. */
  21. public constructor() {
  22. super("websocket");
  23. this._jobConfigDefault = "disabled";
  24. this._jobConfig = {
  25. dispatch: true
  26. };
  27. }
  28. /**
  29. * startup - Startup websocket module
  30. */
  31. public override async startup() {
  32. await super.startup();
  33. this._httpServer = http
  34. .createServer(express())
  35. .listen(config.get("port"));
  36. this._wsServer = new WebSocketServer({
  37. server: this._httpServer,
  38. path: "/ws",
  39. WebSocket
  40. });
  41. this._wsServer.on(
  42. "connection",
  43. (socket: WebSocket, request: IncomingMessage) =>
  44. this._handleConnection(socket, request)
  45. );
  46. this._keepAliveInterval = setInterval(() => this._keepAlive(), 45000);
  47. this._wsServer.on("close", async () =>
  48. clearInterval(this._keepAliveInterval)
  49. );
  50. await super._started();
  51. }
  52. /**
  53. * keepAlive - Ping open clients and terminate closed
  54. */
  55. private async _keepAlive() {
  56. if (!this._wsServer) return;
  57. for await (const clients of this._wsServer.clients.entries()) {
  58. await Promise.all(
  59. clients.map(async socket => {
  60. switch (socket.readyState) {
  61. case socket.OPEN:
  62. socket.ping();
  63. break;
  64. case socket.CLOSED:
  65. socket.terminate();
  66. break;
  67. default:
  68. break;
  69. }
  70. })
  71. );
  72. }
  73. }
  74. /**
  75. * handleConnection - Handle websocket connection
  76. */
  77. private async _handleConnection(
  78. socket: WebSocket,
  79. request: IncomingMessage
  80. ) {
  81. if (JobQueue.getStatus().isPaused) {
  82. socket.close();
  83. return;
  84. }
  85. socket.setSocketId(request.headers["sec-websocket-key"]);
  86. let sessionId;
  87. let user;
  88. if (request.headers.cookie) {
  89. sessionId = request.headers.cookie
  90. .split("; ")
  91. .find(
  92. cookie =>
  93. cookie.substring(0, cookie.indexOf("=")) ===
  94. config.get<string>("cookie")
  95. );
  96. sessionId = sessionId?.substring(
  97. sessionId.indexOf("=") + 1,
  98. sessionId.length
  99. );
  100. }
  101. if (sessionId && isObjectIdOrHexString(sessionId)) {
  102. socket.setSessionId(sessionId);
  103. const Session = await DataModule.getModel("sessions");
  104. const session = await Session.findByIdAndUpdate(sessionId, {
  105. updatedAt: Date.now()
  106. });
  107. if (session) {
  108. const User = await DataModule.getModel("users");
  109. user = await User.findById(session.userId);
  110. }
  111. }
  112. const readyData = {
  113. config: {
  114. cookie: config.get("cookie"),
  115. sitename: config.get("sitename"),
  116. recaptcha: {
  117. enabled: config.get("apis.recaptcha.enabled"),
  118. key: config.get("apis.recaptcha.key")
  119. },
  120. githubAuthentication: config.get("apis.github.enabled"),
  121. messages: config.get("messages"),
  122. christmas: config.get("christmas"),
  123. footerLinks: config.get("footerLinks"),
  124. shortcutOverrides: config.get("shortcutOverrides"),
  125. registrationDisabled: config.get("registrationDisabled"),
  126. mailEnabled: config.get("mail.enabled"),
  127. discogsEnabled: config.get("apis.discogs.enabled"),
  128. experimental: {
  129. changable_listen_mode: config.get(
  130. "experimental.changable_listen_mode"
  131. ),
  132. media_session: config.get("experimental.media_session"),
  133. disable_youtube_search: config.get(
  134. "experimental.disable_youtube_search"
  135. ),
  136. station_history: config.get("experimental.station_history"),
  137. soundcloud: config.get("experimental.soundcloud"),
  138. spotify: config.get("experimental.spotify")
  139. }
  140. },
  141. user
  142. };
  143. socket.log({
  144. type: "debug",
  145. message: `WebSocket opened #${socket.getSocketId()}`
  146. });
  147. socket.on("error", error =>
  148. socket.log({
  149. type: "error",
  150. message: error.message,
  151. data: { error }
  152. })
  153. );
  154. socket.on("close", async () => {
  155. await JobQueue.runJob(
  156. "api",
  157. "unsubscribeAll",
  158. {},
  159. {
  160. socketId: socket.getSocketId()
  161. }
  162. );
  163. socket.log({
  164. type: "debug",
  165. message: `WebSocket closed #${socket.getSocketId()}`
  166. });
  167. });
  168. socket.dispatch("ready", readyData);
  169. socket.on("message", message => this._handleMessage(socket, message));
  170. }
  171. /**
  172. * handleMessage - Handle websocket message
  173. */
  174. private async _handleMessage(socket: WebSocket, message: RawData) {
  175. if (JobQueue.getStatus().isPaused) {
  176. socket.close();
  177. return;
  178. }
  179. let callbackRef;
  180. try {
  181. const data = JSON.parse(message.toString());
  182. if (!Array.isArray(data) || data.length < 1)
  183. throw new Error("Invalid request");
  184. const [moduleJob, payload, options] = data;
  185. const [moduleName, ...jobNameParts] = moduleJob.split(".");
  186. const jobName = jobNameParts.join(".");
  187. const { callbackRef } = options ?? payload ?? {};
  188. if (!callbackRef)
  189. throw new Error(
  190. `No callback reference provided for job ${moduleJob}`
  191. );
  192. const module = ModuleManager.getModule(moduleName);
  193. if (!module) throw new Error(`Module "${moduleName}" not found`);
  194. const job = module.getJob(jobName);
  195. if (!job.api) throw new Error(`Job "${jobName}" not found.`);
  196. let session;
  197. if (socket.getSessionId()) {
  198. const Session = await DataModule.getModel("sessions");
  199. session = await Session.findByIdAndUpdate(
  200. socket.getSessionId(),
  201. {
  202. updatedAt: Date.now()
  203. }
  204. );
  205. }
  206. await JobQueue.queueJob(
  207. moduleName,
  208. jobName,
  209. payload,
  210. {},
  211. {
  212. session,
  213. socketId: socket.getSocketId(),
  214. callbackRef
  215. }
  216. );
  217. } catch (error) {
  218. const message = error?.message ?? error;
  219. if (callbackRef)
  220. socket.dispatch("jobCallback", callbackRef, {
  221. status: "error",
  222. message
  223. });
  224. else socket.dispatch("error", message);
  225. }
  226. }
  227. /**
  228. * getSockets - Get websocket clients
  229. */
  230. public async getSockets() {
  231. return this._wsServer?.clients;
  232. }
  233. /**
  234. * getSocket - Get websocket client
  235. */
  236. public async getSocket(socketId?: string, sessionId?: Types.ObjectId) {
  237. if (!this._wsServer) return null;
  238. for (const clients of this._wsServer.clients.entries() as IterableIterator<
  239. [WebSocket, WebSocket]
  240. >) {
  241. const socket = clients.find(socket => {
  242. if (socket.getSocketId() === socketId) return true;
  243. if (socket.getSessionId() === sessionId) return true;
  244. return false;
  245. });
  246. if (socket) return socket;
  247. }
  248. return null;
  249. }
  250. /**
  251. * dispatch - Dispatch message to socket
  252. */
  253. public async dispatch(socketId: string, channel: string, ...values) {
  254. const socket = await this.getSocket(socketId);
  255. if (!socket) return;
  256. socket.dispatch(channel, ...values);
  257. }
  258. /**
  259. * shutdown - Shutdown websocket module
  260. */
  261. public override async shutdown() {
  262. await super.shutdown();
  263. if (this._httpServer) this._httpServer.close();
  264. if (this._wsServer) this._wsServer.close();
  265. await this._stopped();
  266. }
  267. }
  268. export type WebSocketModuleJobs = {
  269. [Property in keyof UniqueMethods<WebSocketModule>]: {
  270. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  271. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  272. };
  273. };
  274. export default new WebSocketModule();