WebSocketModule.ts 5.5 KB


  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types } from "mongoose";
  6. import BaseModule from "../BaseModule";
  7. import { UniqueMethods } from "../types/Modules";
  8. import WebSocket from "../WebSocket";
  9. import JobContext from "../JobContext";
  10. import Job from "../Job";
  11. export default class WebSocketModule extends BaseModule {
  12. private _httpServer?: Server;
  13. private _wsServer?: WebSocketServer;
  14. private _keepAliveInterval?: NodeJS.Timer;
  15. /**
  16. * WebSocket Module
  17. */
  18. public constructor() {
  19. super("websocket");
  20. this._jobApiDefault = false;
  21. }
  22. /**
  23. * startup - Startup websocket module
  24. */
  25. public override async startup() {
  26. await super.startup();
  27. this._httpServer = http
  28. .createServer(express())
  29. .listen(config.get("port"));
  30. this._wsServer = new WebSocketServer({
  31. server: this._httpServer,
  32. path: "/ws",
  33. WebSocket
  34. });
  35. this._wsServer.on(
  36. "connection",
  37. (socket: WebSocket, request: IncomingMessage) =>
  38. this._handleConnection(socket, request)
  39. );
  40. this._keepAliveInterval = setInterval(() => this._keepAlive(), 45000);
  41. this._wsServer.on("close", async () =>
  42. clearInterval(this._keepAliveInterval)
  43. );
  44. await super._started();
  45. }
  46. /**
  47. * keepAlive - Ping open clients and terminate closed
  48. */
  49. private async _keepAlive() {
  50. if (!this._wsServer) return;
  51. for await (const clients of this._wsServer.clients.entries()) {
  52. await Promise.all(
  53. clients.map(async socket => {
  54. switch (socket.readyState) {
  55. case socket.OPEN:
  56. socket.ping();
  57. break;
  58. case socket.CLOSED:
  59. socket.terminate();
  60. break;
  61. default:
  62. break;
  63. }
  64. })
  65. );
  66. }
  67. }
  68. /**
  69. * handleConnection - Handle websocket connection
  70. */
  71. private async _handleConnection(
  72. socket: WebSocket,
  73. request: IncomingMessage
  74. ) {
  75. if (this._jobQueue.getStatus().isPaused) {
  76. socket.close();
  77. return;
  78. }
  79. const readyData = await new Job("prepareWebsocket", "api", {
  80. socket,
  81. request
  82. }).execute();
  83. socket.log({
  84. type: "debug",
  85. message: `WebSocket opened #${socket.getSocketId()}`
  86. });
  87. socket.on("error", error =>
  88. socket.log({
  89. type: "error",
  90. message: error.message,
  91. data: { error }
  92. })
  93. );
  94. socket.on("close", async () => {
  95. socket.log({
  96. type: "debug",
  97. message: `WebSocket closed #${socket.getSocketId()}`
  98. });
  99. });
  100. socket.dispatch("ready", readyData);
  101. socket.on("message", message => this._handleMessage(socket, message));
  102. }
  103. /**
  104. * handleMessage - Handle websocket message
  105. */
  106. private async _handleMessage(socket: WebSocket, message: RawData) {
  107. if (this._jobQueue.getStatus().isPaused) {
  108. socket.close();
  109. return;
  110. }
  111. let callbackRef;
  112. try {
  113. const data = JSON.parse(message.toString());
  114. if (!Array.isArray(data) || data.length < 1)
  115. throw new Error("Invalid request");
  116. const [moduleJob, payload, options] = data;
  117. const [moduleName, ...jobNameParts] = moduleJob.split(".");
  118. const jobName = jobNameParts.join(".");
  119. callbackRef = (options ?? payload ?? {}).CB_REF;
  120. if (!callbackRef)
  121. throw new Error(
  122. `No callback reference provided for job ${moduleJob}`
  123. );
  124. const module = this._moduleManager.getModule(moduleName);
  125. if (!module) throw new Error(`Module "${moduleName}" not found`);
  126. const job = module.getJob(jobName);
  127. if (!job.api) throw new Error(`Job "${jobName}" not found.`);
  128. const res = await this._jobQueue.runJob("api", "runJob", {
  129. moduleName,
  130. jobName,
  131. payload,
  132. sessionId: socket.getSessionId(),
  133. socketId: socket.getSocketId()
  134. });
  135. socket.dispatch("CB_REF", callbackRef, {
  136. status: "success",
  137. data: res
  138. });
  139. } catch (error) {
  140. const message = error?.message ?? error;
  141. socket.log({ type: "error", message });
  142. if (callbackRef)
  143. socket.dispatch("CB_REF", callbackRef, {
  144. status: "error",
  145. message
  146. });
  147. else socket.dispatch("ERROR", message);
  148. }
  149. }
  150. /**
  151. * getSockets - Get websocket clients
  152. */
  153. public async getSockets(context: JobContext) {
  154. return this._wsServer?.clients;
  155. }
  156. /**
  157. * getSocket - Get websocket client
  158. */
  159. public async getSocket(
  160. context: JobContext,
  161. {
  162. socketId,
  163. sessionId
  164. }: { socketId?: string; sessionId?: Types.ObjectId }
  165. ) {
  166. if (!this._wsServer) return null;
  167. for (const clients of this._wsServer.clients.entries() as IterableIterator<
  168. [WebSocket, WebSocket]
  169. >) {
  170. const socket = clients.find(socket => {
  171. if (socket.getSocketId() === socketId) return true;
  172. if (socket.getSessionId() === sessionId) return true;
  173. return false;
  174. });
  175. if (socket) return socket;
  176. }
  177. return null;
  178. }
  179. /**
  180. * dispatch - Dispatch message to socket
  181. */
  182. public async dispatch(
  183. context: JobContext,
  184. {
  185. socketId,
  186. channel,
  187. value
  188. }: { socketId: string; channel: string; value?: any }
  189. ) {
  190. const socket = await context.executeJob("websocket", "getSocket", {
  191. socketId
  192. });
  193. if (!socket) return;
  194. socket.dispatch(channel, value);
  195. }
  196. /**
  197. * shutdown - Shutdown websocket module
  198. */
  199. public override async shutdown() {
  200. await super.shutdown();
  201. if (this._httpServer) this._httpServer.close();
  202. if (this._wsServer) this._wsServer.close();
  203. await this._stopped();
  204. }
  205. }
  206. export type WebSocketModuleJobs = {
  207. [Property in keyof UniqueMethods<WebSocketModule>]: {
  208. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  209. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  210. };
  211. };