WebSocketModule.ts 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types } from "mongoose";
  6. import BaseModule from "../BaseModule";
  7. import { UniqueMethods } from "../types/Modules";
  8. import WebSocket from "../WebSocket";
  9. import JobContext from "../JobContext";
  10. import Job from "../Job";
  11. export default class WebSocketModule extends BaseModule {
  12. private httpServer?: Server;
  13. private wsServer?: WebSocketServer;
  14. private keepAliveInterval?: NodeJS.Timer;
  15. /**
  16. * WebSocket Module
  17. */
  18. public constructor() {
  19. super("websocket");
  20. }
  21. /**
  22. * startup - Startup websocket module
  23. */
  24. public override async startup() {
  25. await super.startup();
  26. this.httpServer = http
  27. .createServer(express())
  28. .listen(config.get("port"));
  29. this.wsServer = new WebSocketServer({
  30. server: this.httpServer,
  31. path: "/ws",
  32. WebSocket
  33. });
  34. this.wsServer.on(
  35. "connection",
  36. (socket: WebSocket, request: IncomingMessage) =>
  37. this.handleConnection(socket, request)
  38. );
  39. this.keepAliveInterval = setInterval(() => this.keepAlive(), 45000);
  40. this.wsServer.on("close", async () =>
  41. clearInterval(this.keepAliveInterval)
  42. );
  43. await super.started();
  44. }
  45. /**
  46. * keepAlive - Ping open clients and terminate closed
  47. */
  48. private async keepAlive() {
  49. if (!this.wsServer) return;
  50. for await (const clients of this.wsServer.clients.entries()) {
  51. await Promise.all(
  52. clients.map(async socket => {
  53. switch (socket.readyState) {
  54. case socket.OPEN:
  55. socket.ping();
  56. break;
  57. case socket.CLOSED:
  58. socket.terminate();
  59. break;
  60. default:
  61. break;
  62. }
  63. })
  64. );
  65. }
  66. }
  67. /**
  68. * handleConnection - Handle websocket connection
  69. */
  70. private async handleConnection(
  71. socket: WebSocket,
  72. request: IncomingMessage
  73. ) {
  74. if (this.jobQueue.getStatus().isPaused) {
  75. socket.close();
  76. return;
  77. }
  78. const readyData = await new Job("prepareWebsocket", "api", {
  79. socket,
  80. request
  81. }).execute();
  82. socket.log({
  83. type: "debug",
  84. message: `WebSocket opened #${socket.getSocketId()}`
  85. });
  86. socket.on("error", error =>
  87. socket.log({
  88. type: "error",
  89. message: error.message,
  90. data: { error }
  91. })
  92. );
  93. socket.on("close", async () => {
  94. socket.log({
  95. type: "debug",
  96. message: `WebSocket closed #${socket.getSocketId()}`
  97. });
  98. });
  99. socket.dispatch("ready", readyData);
  100. socket.on("message", message => this.handleMessage(socket, message));
  101. }
  102. /**
  103. * handleMessage - Handle websocket message
  104. */
  105. private async handleMessage(socket: WebSocket, message: RawData) {
  106. if (this.jobQueue.getStatus().isPaused) {
  107. socket.close();
  108. return;
  109. }
  110. let callbackRef;
  111. try {
  112. const data = JSON.parse(message.toString());
  113. if (!Array.isArray(data) || data.length < 1)
  114. throw new Error("Invalid request");
  115. const [moduleJob, payload, options] = data;
  116. const [moduleName, ...jobNameParts] = moduleJob.split(".");
  117. const jobName = jobNameParts.join(".");
  118. callbackRef = (options ?? payload ?? {}).CB_REF;
  119. if (!callbackRef)
  120. throw new Error(
  121. `No callback reference provided for job ${moduleJob}`
  122. );
  123. const res = await this.jobQueue.runJob("api", "runJob", {
  124. moduleName,
  125. jobName,
  126. payload,
  127. sessionId: socket.getSessionId(),
  128. socketId: socket.getSocketId()
  129. });
  130. socket.dispatch("CB_REF", callbackRef, {
  131. status: "success",
  132. data: res
  133. });
  134. } catch (error) {
  135. const message = error?.message ?? error;
  136. socket.log({ type: "error", message });
  137. if (callbackRef)
  138. socket.dispatch("CB_REF", callbackRef, {
  139. status: "error",
  140. message
  141. });
  142. else socket.dispatch("ERROR", message);
  143. }
  144. }
  145. /**
  146. * getSockets - Get websocket clients
  147. */
  148. public async getSockets(context: JobContext) {
  149. return this.wsServer?.clients;
  150. }
  151. /**
  152. * getSocket - Get websocket client
  153. */
  154. public async getSocket(
  155. context: JobContext,
  156. {
  157. socketId,
  158. sessionId
  159. }: { socketId?: string; sessionId?: Types.ObjectId }
  160. ) {
  161. if (!this.wsServer) return null;
  162. for (const clients of this.wsServer.clients.entries() as IterableIterator<
  163. [WebSocket, WebSocket]
  164. >) {
  165. const socket = clients.find(socket => {
  166. if (socket.getSocketId() === socketId) return true;
  167. if (socket.getSessionId() === sessionId) return true;
  168. return false;
  169. });
  170. if (socket) return socket;
  171. }
  172. return null;
  173. }
  174. /**
  175. * dispatch - Dispatch message to socket
  176. */
  177. public async dispatch(
  178. context: JobContext,
  179. {
  180. socketId,
  181. channel,
  182. value
  183. }: { socketId: string; channel: string; value?: any }
  184. ) {
  185. const socket = await context.executeJob("websocket", "getSocket", {
  186. socketId
  187. });
  188. if (!socket) return;
  189. socket.dispatch(channel, value);
  190. }
  191. /**
  192. * shutdown - Shutdown websocket module
  193. */
  194. public override async shutdown() {
  195. await super.shutdown();
  196. if (this.httpServer) this.httpServer.close();
  197. if (this.wsServer) this.wsServer.close();
  198. await this.stopped();
  199. }
  200. }
  201. export type WebSocketModuleJobs = {
  202. [Property in keyof UniqueMethods<WebSocketModule>]: {
  203. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  204. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  205. };
  206. };