WebSocketModule.ts 7.8 KB


  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types, isObjectIdOrHexString } from "mongoose";
  6. import { forEachIn } from "@common/utils/forEachIn";
  7. import { getErrorMessage } from "@common/utils/getErrorMessage";
  8. import BaseModule from "@/BaseModule";
  9. import WebSocket from "@/WebSocket";
  10. import ModuleManager from "@/ModuleManager";
  11. import JobQueue from "@/JobQueue";
  12. import DataModule from "./DataModule";
  13. import { UserModel } from "./DataModule/models/users/schema";
  14. import { SessionModel } from "./DataModule/models/sessions/schema";
  15. import EventsModule from "./EventsModule";
  16. export class WebSocketModule extends BaseModule {
  17. private _httpServer?: Server;
  18. private _wsServer?: WebSocketServer;
  19. private _keepAliveInterval?: NodeJS.Timer;
  20. /**
  21. * WebSocket Module
  22. */
  23. public constructor() {
  24. super("websocket");
  25. this._dependentModules = ["data", "events"];
  26. }
  27. /**
  28. * startup - Startup websocket module
  29. */
  30. public override async startup() {
  31. await super.startup();
  32. this._httpServer = http
  33. .createServer(express())
  34. .listen(config.get("port"));
  35. this._wsServer = new WebSocketServer({
  36. server: this._httpServer,
  37. path: "/ws",
  38. WebSocket
  39. });
  40. this._wsServer.on(
  41. "connection",
  42. (socket: WebSocket, request: IncomingMessage) =>
  43. this._handleConnection(socket, request)
  44. );
  45. this._keepAliveInterval = setInterval(() => this._keepAlive(), 45000);
  46. this._wsServer.on("close", async () =>
  47. clearInterval(this._keepAliveInterval)
  48. );
  49. await EventsModule.pSubscribe("job.*", async payload => {
  50. if (
  51. !payload ||
  52. typeof payload !== "object" ||
  53. Array.isArray(payload)
  54. )
  55. return;
  56. const { socketId, callbackRef } = payload;
  57. if (!socketId || !callbackRef) return;
  58. delete payload.socketId;
  59. delete payload.callbackRef;
  60. this.dispatch(socketId, "jobCallback", callbackRef, payload);
  61. });
  62. await super._started();
  63. }
  64. /**
  65. * keepAlive - Ping open clients and terminate closed
  66. */
  67. private async _keepAlive() {
  68. if (!this._wsServer) return;
  69. for await (const clients of this._wsServer.clients.entries()) {
  70. await forEachIn(clients, async socket => {
  71. switch (socket.readyState) {
  72. case socket.OPEN:
  73. socket.ping();
  74. break;
  75. case socket.CLOSED:
  76. socket.terminate();
  77. break;
  78. default:
  79. break;
  80. }
  81. });
  82. }
  83. }
  84. /**
  85. * handleConnection - Handle websocket connection
  86. */
  87. private async _handleConnection(
  88. socket: WebSocket,
  89. request: IncomingMessage
  90. ) {
  91. if (JobQueue.getStatus().isPaused) {
  92. socket.close();
  93. return;
  94. }
  95. socket.setSocketId(request.headers["sec-websocket-key"]);
  96. let sessionId;
  97. let user;
  98. if (request.headers.cookie) {
  99. sessionId = request.headers.cookie
  100. .split("; ")
  101. .find(
  102. cookie =>
  103. cookie.substring(0, cookie.indexOf("=")) ===
  104. config.get<string>("cookie")
  105. );
  106. sessionId = sessionId?.substring(
  107. sessionId.indexOf("=") + 1,
  108. sessionId.length
  109. );
  110. }
  111. if (sessionId && isObjectIdOrHexString(sessionId)) {
  112. socket.setSessionId(sessionId);
  113. const Session = await DataModule.getModel("sessions");
  114. const session = await Session.findByIdAndUpdate(sessionId, {
  115. updatedAt: Date.now()
  116. });
  117. if (session) {
  118. const User = await DataModule.getModel<UserModel>("users");
  119. user = await User.findById(session.userId);
  120. }
  121. }
  122. const readyData = {
  123. config: {
  124. cookie: config.get("cookie"),
  125. sitename: config.get("sitename"),
  126. recaptcha: {
  127. enabled: config.get("apis.recaptcha.enabled"),
  128. key: config.get("apis.recaptcha.key")
  129. },
  130. githubAuthentication: config.get("apis.github.enabled"),
  131. messages: config.get("messages"),
  132. christmas: config.get("christmas"),
  133. footerLinks: config.get("footerLinks"),
  134. shortcutOverrides: config.get("shortcutOverrides"),
  135. registrationDisabled: config.get("registrationDisabled"),
  136. mailEnabled: config.get("mail.enabled"),
  137. discogsEnabled: config.get("apis.discogs.enabled"),
  138. experimental: {
  139. changable_listen_mode: config.get(
  140. "experimental.changable_listen_mode"
  141. ),
  142. media_session: config.get("experimental.media_session"),
  143. disable_youtube_search: config.get(
  144. "experimental.disable_youtube_search"
  145. ),
  146. station_history: config.get("experimental.station_history"),
  147. soundcloud: config.get("experimental.soundcloud"),
  148. spotify: config.get("experimental.spotify")
  149. }
  150. },
  151. user
  152. };
  153. socket.log({
  154. type: "debug",
  155. message: `WebSocket opened #${socket.getSocketId()}`
  156. });
  157. socket.on("error", error =>
  158. socket.log({
  159. type: "error",
  160. message: error.message,
  161. data: { error }
  162. })
  163. );
  164. socket.on("close", async () => {
  165. const socketId = socket.getSocketId();
  166. const Job = EventsModule.getJob("unsubscribeAll");
  167. await JobQueue.runJob(Job, null, {
  168. socketId
  169. });
  170. socket.log({
  171. type: "debug",
  172. message: `WebSocket closed #${socketId}`
  173. });
  174. });
  175. socket.dispatch("ready", readyData);
  176. socket.on("message", message => this._handleMessage(socket, message));
  177. }
  178. /**
  179. * handleMessage - Handle websocket message
  180. */
  181. private async _handleMessage(socket: WebSocket, message: RawData) {
  182. if (JobQueue.getStatus().isPaused) {
  183. socket.close();
  184. return;
  185. }
  186. let callbackRef;
  187. try {
  188. const data = JSON.parse(message.toString());
  189. if (!Array.isArray(data) || data.length < 1)
  190. throw new Error("Invalid request");
  191. const [moduleJob, payload, options] = data;
  192. const [moduleName, ...jobNameParts] = moduleJob.split(".");
  193. const jobName = jobNameParts.join(".");
  194. const { callbackRef } = options ?? payload ?? {};
  195. if (!callbackRef)
  196. throw new Error(
  197. `No callback reference provided for job ${moduleJob}`
  198. );
  199. const module = ModuleManager.getModule(moduleName);
  200. if (!module) throw new Error(`Module "${moduleName}" not found`);
  201. const Job = module.getJob(jobName);
  202. if (!Job?.isApiEnabled())
  203. throw new Error(`Job "${jobName}" not found.`);
  204. let session;
  205. if (socket.getSessionId()) {
  206. const Session = await DataModule.getModel<SessionModel>(
  207. "sessions"
  208. );
  209. session = await Session.findByIdAndUpdate(
  210. socket.getSessionId(),
  211. {
  212. updatedAt: Date.now()
  213. }
  214. );
  215. if (!session) throw new Error("Session not found.");
  216. }
  217. await JobQueue.queueJob(Job, payload, {
  218. session,
  219. socketId: socket.getSocketId(),
  220. callbackRef
  221. });
  222. } catch (error) {
  223. const message = getErrorMessage(error);
  224. if (callbackRef)
  225. socket.dispatch("jobCallback", callbackRef, {
  226. status: "error",
  227. message
  228. });
  229. else socket.dispatch("error", message);
  230. }
  231. }
  232. /**
  233. * getSockets - Get websocket clients
  234. */
  235. public async getSockets() {
  236. return this._wsServer?.clients;
  237. }
  238. /**
  239. * getSocket - Get websocket client
  240. */
  241. public async getSocket(socketId?: string, sessionId?: Types.ObjectId) {
  242. if (!this._wsServer) return null;
  243. for (const clients of this._wsServer.clients.entries() as IterableIterator<
  244. [WebSocket, WebSocket]
  245. >) {
  246. const socket = clients.find(socket => {
  247. if (socket.getSocketId() === socketId) return true;
  248. if (socket.getSessionId() === sessionId) return true;
  249. return false;
  250. });
  251. if (socket) return socket;
  252. }
  253. return null;
  254. }
  255. /**
  256. * dispatch - Dispatch message to socket
  257. */
  258. public async dispatch(
  259. socketId: string,
  260. channel: string,
  261. ...values: unknown[]
  262. ) {
  263. const socket = await this.getSocket(socketId);
  264. if (!socket) return;
  265. socket.dispatch(channel, ...values);
  266. }
  267. /**
  268. * shutdown - Shutdown websocket module
  269. */
  270. public override async shutdown() {
  271. await super.shutdown();
  272. if (this._httpServer) this._httpServer.close();
  273. if (this._wsServer) this._wsServer.close();
  274. await this._stopped();
  275. }
  276. }
  277. export default new WebSocketModule();