WebSocketModule.ts 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types } from "mongoose";
  6. import BaseModule from "../BaseModule";
  7. import { UniqueMethods } from "../types/Modules";
  8. import WebSocket from "../WebSocket";
  9. import JobContext from "../JobContext";
  10. import Job from "../Job";
  11. export default class WebSocketModule extends BaseModule {
  12. private httpServer?: Server;
  13. private wsServer?: WebSocketServer;
  14. private keepAliveInterval?: NodeJS.Timer;
  15. /**
  16. * WebSocket Module
  17. */
  18. public constructor() {
  19. super("websocket");
  20. }
  21. /**
  22. * startup - Startup websocket module
  23. */
  24. public override async startup() {
  25. await super.startup();
  26. this.httpServer = http
  27. .createServer(express())
  28. .listen(config.get("port"));
  29. this.wsServer = new WebSocketServer({
  30. server: this.httpServer,
  31. path: "/ws",
  32. WebSocket
  33. });
  34. this.wsServer.on(
  35. "connection",
  36. (socket: WebSocket, request: IncomingMessage) =>
  37. this.handleConnection(socket, request)
  38. );
  39. this.keepAliveInterval = setInterval(() => this.keepAlive(), 45000);
  40. this.wsServer.on("close", async () =>
  41. clearInterval(this.keepAliveInterval)
  42. );
  43. await super.started();
  44. }
  45. /**
  46. * keepAlive - Ping open clients and terminate closed
  47. */
  48. private async keepAlive() {
  49. if (!this.wsServer) return;
  50. for await (const clients of this.wsServer.clients.entries()) {
  51. await Promise.all(
  52. clients.map(async socket => {
  53. switch (socket.readyState) {
  54. case socket.OPEN:
  55. socket.ping();
  56. break;
  57. case socket.CLOSED:
  58. socket.terminate();
  59. break;
  60. default:
  61. break;
  62. }
  63. })
  64. );
  65. }
  66. }
  67. /**
  68. * handleConnection - Handle websocket connection
  69. */
  70. private async handleConnection(
  71. socket: WebSocket,
  72. request: IncomingMessage
  73. ) {
  74. if (this.jobQueue.getStatus().isPaused) {
  75. socket.close();
  76. return;
  77. }
  78. const readyData = await new Job("prepareWebsocket", "api", {
  79. socket,
  80. request
  81. }).execute();
  82. socket.log({
  83. type: "debug",
  84. message: `WebSocket opened #${socket.getSocketId()}`
  85. });
  86. socket.on("error", error =>
  87. socket.log({
  88. type: "error",
  89. message: error.message,
  90. data: { error }
  91. })
  92. );
  93. socket.on("close", async () => {
  94. socket.log({
  95. type: "debug",
  96. message: `WebSocket closed #${socket.getSocketId()}`
  97. });
  98. });
  99. socket.dispatch("ready", readyData);
  100. socket.on("message", message => this.handleMessage(socket, message));
  101. }
  102. /**
  103. * handleMessage - Handle websocket message
  104. */
  105. private async handleMessage(socket: WebSocket, message: RawData) {
  106. if (this.jobQueue.getStatus().isPaused) {
  107. socket.close();
  108. return;
  109. }
  110. let callbackRef;
  111. try {
  112. const data = JSON.parse(message.toString());
  113. if (!Array.isArray(data) || data.length < 1)
  114. throw new Error("Invalid request");
  115. const [moduleJob, payload, options] = data;
  116. const [moduleName, jobName] = moduleJob.split(".");
  117. callbackRef = (options ?? payload ?? {}).CB_REF;
  118. if (!callbackRef)
  119. throw new Error(
  120. `No callback reference provided for job ${moduleJob}`
  121. );
  122. if (moduleName === "data") {
  123. const res = await this.jobQueue.runJob("api", "runDataJob", {
  124. jobName,
  125. payload,
  126. sessionId: socket.getSessionId(),
  127. socketId: socket.getSocketId()
  128. });
  129. socket.dispatch("CB_REF", callbackRef, {
  130. status: "success",
  131. data: res
  132. });
  133. } else {
  134. const res = await this.jobQueue.runJob("api", "runJob", {
  135. moduleName,
  136. jobName,
  137. payload,
  138. sessionId: socket.getSessionId(),
  139. socketId: socket.getSocketId()
  140. });
  141. socket.dispatch("CB_REF", callbackRef, {
  142. status: "success",
  143. data: res
  144. });
  145. }
  146. } catch (error) {
  147. const message = error?.message ?? error;
  148. socket.log({ type: "error", message });
  149. if (callbackRef)
  150. socket.dispatch("CB_REF", callbackRef, {
  151. status: "error",
  152. message
  153. });
  154. else socket.dispatch("ERROR", message);
  155. }
  156. }
  157. /**
  158. * getSockets - Get websocket clients
  159. */
  160. public async getSockets(context: JobContext) {
  161. return this.wsServer?.clients;
  162. }
  163. /**
  164. * getSocket - Get websocket client
  165. */
  166. public async getSocket(
  167. context: JobContext,
  168. {
  169. socketId,
  170. sessionId
  171. }: { socketId?: string; sessionId?: Types.ObjectId }
  172. ) {
  173. if (!this.wsServer) return null;
  174. for (const clients of this.wsServer.clients.entries() as IterableIterator<
  175. [WebSocket, WebSocket]
  176. >) {
  177. const socket = clients.find(socket => {
  178. if (socket.getSocketId() === socketId) return true;
  179. if (socket.getSessionId() === sessionId) return true;
  180. return false;
  181. });
  182. if (socket) return socket;
  183. }
  184. return null;
  185. }
  186. /**
  187. * dispatch - Dispatch message to socket
  188. */
  189. public async dispatch(
  190. context: JobContext,
  191. {
  192. socketId,
  193. channel,
  194. value
  195. }: { socketId: string; channel: string; value?: any }
  196. ) {
  197. const socket = await context.executeJob("websocket", "getSocket", {
  198. socketId
  199. });
  200. if (!socket) return;
  201. socket.dispatch(channel, value);
  202. }
  203. /**
  204. * shutdown - Shutdown websocket module
  205. */
  206. public override async shutdown() {
  207. await super.shutdown();
  208. if (this.httpServer) this.httpServer.close();
  209. if (this.wsServer) this.wsServer.close();
  210. await this.stopped();
  211. }
  212. }
  213. export type WebSocketModuleJobs = {
  214. [Property in keyof UniqueMethods<WebSocketModule>]: {
  215. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  216. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  217. };
  218. };