WebSocketModule.ts 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import BaseModule from "../BaseModule";
  6. import { UniqueMethods } from "../types/Modules";
  7. import JobQueue from "../JobQueue";
  8. import WebSocket from "../WebSocket";
  9. export default class WebSocketModule extends BaseModule {
  10. private httpServer?: Server;
  11. private wsServer?: WebSocketServer;
  12. private jobQueue: JobQueue;
  13. private keepAliveInterval?: NodeJS.Timer;
  14. /**
  15. * WebSocket Module
  16. */
  17. public constructor() {
  18. super("websocket");
  19. this.jobQueue = JobQueue.getPrimaryInstance();
  20. }
  21. /**
  22. * startup - Startup websocket module
  23. */
  24. public override async startup() {
  25. await super.startup();
  26. this.httpServer = http
  27. .createServer(express())
  28. .listen(config.get("port"));
  29. this.wsServer = new WebSocketServer({
  30. server: this.httpServer,
  31. path: "/ws",
  32. WebSocket
  33. });
  34. this.wsServer.on(
  35. "connection",
  36. (socket: WebSocket, request: IncomingMessage) =>
  37. this.handleConnection(socket, request)
  38. );
  39. this.keepAliveInterval = setInterval(() => this.keepAlive(), 45000);
  40. this.wsServer.on("close", async () =>
  41. clearInterval(this.keepAliveInterval)
  42. );
  43. await super.started();
  44. }
  45. /**
  46. * keepAlive - Ping open clients and terminate closed
  47. */
  48. private async keepAlive() {
  49. if (!this.wsServer) return;
  50. for await (const clients of this.wsServer.clients.entries()) {
  51. await Promise.all(
  52. clients.map(async socket => {
  53. switch (socket.readyState) {
  54. case socket.OPEN:
  55. socket.ping();
  56. break;
  57. case socket.CLOSED:
  58. socket.terminate();
  59. break;
  60. default:
  61. break;
  62. }
  63. })
  64. );
  65. }
  66. }
  67. /**
  68. * handleConnection - Handle websocket connection
  69. */
  70. private async handleConnection(
  71. socket: WebSocket,
  72. request: IncomingMessage
  73. ) {
  74. if (this.jobQueue.getStatus().isPaused) {
  75. socket.close();
  76. return;
  77. }
  78. const readyData = {
  79. config: {
  80. cookie: config.get("cookie"),
  81. sitename: config.get("sitename"),
  82. recaptcha: {
  83. enabled: config.get("apis.recaptcha.enabled"),
  84. key: config.get("apis.recaptcha.key")
  85. },
  86. githubAuthentication: config.get("apis.github.enabled"),
  87. messages: config.get("messages"),
  88. christmas: config.get("christmas"),
  89. footerLinks: config.get("footerLinks"),
  90. shortcutOverrides: config.get("shortcutOverrides"),
  91. registrationDisabled: config.get("registrationDisabled"),
  92. mailEnabled: config.get("mail.enabled"),
  93. discogsEnabled: config.get("apis.discogs.enabled"),
  94. experimental: {
  95. changable_listen_mode: config.get(
  96. "experimental.changable_listen_mode"
  97. ),
  98. media_session: config.get("experimental.media_session"),
  99. disable_youtube_search: config.get(
  100. "experimental.disable_youtube_search"
  101. ),
  102. station_history: config.get("experimental.station_history"),
  103. soundcloud: config.get("experimental.soundcloud"),
  104. spotify: config.get("experimental.spotify")
  105. }
  106. },
  107. user: { loggedIn: false }
  108. };
  109. socket.dispatch("ready", readyData);
  110. socket.on("message", message => this.handleMessage(socket, message));
  111. }
  112. /**
  113. * handleMessage - Handle websocket message
  114. */
  115. private async handleMessage(socket: WebSocket, message: RawData) {
  116. if (this.jobQueue.getStatus().isPaused) {
  117. socket.close();
  118. return;
  119. }
  120. try {
  121. const data = JSON.parse(message.toString());
  122. if (!Array.isArray(data) || data.length < 1)
  123. throw new Error("Invalid request");
  124. const [moduleJob, payload, options] = data;
  125. const [moduleName, jobName] = moduleJob.split(".");
  126. const { CB_REF } = options ?? payload ?? {};
  127. await this.jobQueue
  128. .runJob(moduleName, jobName, payload)
  129. .then(res => socket.dispatch("CB_REF", CB_REF, res));
  130. } catch (error) {
  131. const message = error?.message ?? error;
  132. this.log({ type: "error", message });
  133. socket.dispatch("ERROR", error?.message ?? error);
  134. }
  135. }
  136. /**
  137. * shutdown - Shutdown websocket module
  138. */
  139. public override async shutdown() {
  140. await super.shutdown();
  141. if (this.httpServer) this.httpServer.close();
  142. if (this.wsServer) this.wsServer.close();
  143. }
  144. }
  145. export type WebSocketModuleJobs = {
  146. [Property in keyof UniqueMethods<WebSocketModule>]: {
  147. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  148. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  149. };
  150. };