EventsModule.ts 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. import {
  2. createClient,
  3. RedisClientOptions,
  4. RedisClientType,
  5. RedisDefaultModules,
  6. RedisFunctions,
  7. RedisModules,
  8. RedisScripts
  9. } from "redis";
  10. import config from "config";
  11. import { forEachIn } from "@common/utils/forEachIn";
  12. import BaseModule, { ModuleStatus } from "@/BaseModule";
  13. import WebSocketModule from "./WebSocketModule";
  14. import Event from "@/modules/EventsModule/Event";
  15. import ModuleManager from "@/ModuleManager";
  16. export class EventsModule extends BaseModule {
  17. private _pubClient?: RedisClientType<
  18. RedisDefaultModules & RedisModules,
  19. RedisFunctions,
  20. RedisScripts
  21. >;
  22. private _subClient?: RedisClientType<
  23. RedisDefaultModules & RedisModules,
  24. RedisFunctions,
  25. RedisScripts
  26. >;
  27. private _subscriptions: Record<string, ((message: any) => Promise<void>)[]>;
  28. private _pSubscriptions: Record<
  29. string,
  30. ((event: Event) => Promise<void>)[]
  31. >;
  32. private _socketSubscriptions: Record<string, Set<string>>;
  33. private _scheduleCallbacks: Record<string, (() => Promise<void>)[]>;
  34. /**
  35. * Events Module
  36. */
  37. public constructor() {
  38. super("events");
  39. this._subscriptions = {};
  40. this._pSubscriptions = {};
  41. this._socketSubscriptions = {};
  42. this._scheduleCallbacks = {};
  43. }
  44. /**
  45. * startup - Startup events module
  46. */
  47. public override async startup() {
  48. await super.startup();
  49. await this._createPubClient();
  50. await this._createSubClient();
  51. await super._started();
  52. }
  53. /**
  54. * createPubClient - Create redis client for publishing
  55. */
  56. private async _createPubClient() {
  57. this._pubClient = createClient({
  58. ...config.get<RedisClientOptions>("redis"),
  59. socket: {
  60. reconnectStrategy: (retries: number, error) => {
  61. if (
  62. retries >= 10 ||
  63. ![ModuleStatus.STARTING, ModuleStatus.STARTED].includes(
  64. this.getStatus()
  65. )
  66. )
  67. return false;
  68. this.log({
  69. type: "debug",
  70. message: `Redis reconnect attempt ${retries}`,
  71. data: error
  72. });
  73. return Math.min(retries * 50, 500);
  74. }
  75. }
  76. });
  77. this._pubClient.on("error", error => {
  78. this.log({ type: "error", message: error.message, data: error });
  79. this.setStatus(ModuleStatus.ERROR);
  80. });
  81. this._pubClient.on("ready", () => {
  82. this.log({ type: "debug", message: "Redis connection ready" });
  83. if (this.getStatus() === ModuleStatus.ERROR)
  84. this.setStatus(ModuleStatus.STARTED);
  85. });
  86. await this._pubClient.connect();
  87. const redisConfigResponse = await this._pubClient.sendCommand([
  88. "CONFIG",
  89. "GET",
  90. "notify-keyspace-events"
  91. ]);
  92. if (
  93. !(
  94. Array.isArray(redisConfigResponse) &&
  95. redisConfigResponse[1] === "xE"
  96. )
  97. )
  98. throw new Error(
  99. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  100. (Array.isArray(redisConfigResponse) &&
  101. redisConfigResponse[1]) ||
  102. "unknown"
  103. }`
  104. );
  105. }
  106. /**
  107. * createSubClient - Create redis client for subscribing
  108. */
  109. private async _createSubClient() {
  110. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  111. this._subClient = this._pubClient?.duplicate();
  112. this._subClient.on("error", error => {
  113. this.log({ type: "error", message: error.message, data: error });
  114. this.setStatus(ModuleStatus.ERROR);
  115. });
  116. this._subClient.on("ready", () => {
  117. this.log({ type: "debug", message: "Redis connection ready" });
  118. if (this.getStatus() === ModuleStatus.ERROR)
  119. this.setStatus(ModuleStatus.STARTED);
  120. });
  121. await this._subClient.connect();
  122. const { database = 0 } = this._subClient.options ?? {};
  123. await this._subClient.pSubscribe("event.*", async (...args) =>
  124. this._subscriptionListener(...args)
  125. );
  126. await this._subClient.pSubscribe(
  127. `__keyevent@${database}__:expired`,
  128. async message => {
  129. const type = message.substring(0, message.indexOf("."));
  130. if (type !== "schedule") return;
  131. const channel = message.substring(message.indexOf(".") + 1);
  132. if (!this._scheduleCallbacks[channel]) return;
  133. await forEachIn(this._scheduleCallbacks[channel], callback =>
  134. callback()
  135. );
  136. }
  137. );
  138. }
  139. public getEvent(path: string) {
  140. const moduleName = path.substring(0, path.indexOf("."));
  141. const eventName = path.substring(path.indexOf(".") + 1);
  142. if (moduleName === this._name) return super.getEvent(eventName);
  143. const module = ModuleManager.getModule(moduleName);
  144. if (!module) throw new Error(`Module "${moduleName}" not found`);
  145. return module.getEvent(eventName);
  146. }
  147. public getAllEvents() {
  148. return Object.fromEntries(
  149. Object.entries(ModuleManager.getModules() ?? {}).map(
  150. ([name, module]) => [name, Object.keys(module.getEvents())]
  151. )
  152. );
  153. }
  154. /**
  155. * createKey - Create hex key
  156. */
  157. private _createKey(type: "event" | "schedule", channel: string) {
  158. if (!["event", "schedule"].includes(type))
  159. throw new Error("Invalid type");
  160. if (!channel || typeof channel !== "string")
  161. throw new Error("Invalid channel");
  162. return `${type}.${channel}`;
  163. }
  164. /**
  165. * publish - Publish an event
  166. */
  167. public async publish(event: Event) {
  168. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  169. const channel = event.getKey();
  170. const value = event.makeMessage();
  171. await this._pubClient.publish(this._createKey("event", channel), value);
  172. }
  173. /**
  174. * subscriptionListener - Listener for event subscriptions
  175. */
  176. private async _subscriptionListener(message: string, key: string) {
  177. const type = key.substring(0, key.indexOf("."));
  178. if (type !== "event") return;
  179. key = key.substring(key.indexOf(".") + 1);
  180. const { path, scope } = Event.parseKey(key);
  181. const EventClass = this.getEvent(path);
  182. const parsedMessage = Event.parseMessage(message);
  183. const event = new EventClass(parsedMessage, scope);
  184. if (this._subscriptions && this._subscriptions[key])
  185. await forEachIn(this._subscriptions[key], async cb => cb(event));
  186. if (this._pSubscriptions)
  187. await forEachIn(
  188. Object.entries(this._pSubscriptions).filter(([subscription]) =>
  189. new RegExp(subscription).test(key)
  190. ),
  191. async ([, callbacks]) =>
  192. forEachIn(callbacks, async cb => cb(event))
  193. );
  194. if (!this._socketSubscriptions[key]) return;
  195. for await (const socketId of this._socketSubscriptions[key].values()) {
  196. await WebSocketModule.dispatch(socketId, key, event.getData());
  197. }
  198. }
  199. /**
  200. * subscribe - Subscribe to an event or schedule completion
  201. */
  202. public async subscribe(
  203. EventClass: any,
  204. callback: (message?: any) => Promise<void>,
  205. scope?: string
  206. ) {
  207. if (!this._subClient) throw new Error("Redis subClient unavailable.");
  208. const key = EventClass.getKey(scope);
  209. const type = EventClass.getType();
  210. if (type === "schedule") {
  211. if (!this._scheduleCallbacks[key])
  212. this._scheduleCallbacks[key] = [];
  213. this._scheduleCallbacks[key].push(() => callback());
  214. return;
  215. }
  216. if (!this._subscriptions[key]) this._subscriptions[key] = [];
  217. this._subscriptions[key].push(callback);
  218. }
  219. /**
  220. * pSubscribe - Subscribe to an event with pattern
  221. */
  222. public async pSubscribe(
  223. pattern: string,
  224. callback: (event: Event) => Promise<void>
  225. ) {
  226. if (!this._subClient) throw new Error("Redis subClient unavailable.");
  227. if (!this._pSubscriptions[pattern]) this._pSubscriptions[pattern] = [];
  228. this._pSubscriptions[pattern].push(callback);
  229. }
  230. /**
  231. * unsubscribe - Unsubscribe from an event or schedule completion
  232. */
  233. public async unsubscribe(
  234. type: "event" | "schedule",
  235. channel: string,
  236. callback: (message?: any) => Promise<void>
  237. ) {
  238. if (!this._subClient) throw new Error("Redis subClient unavailable.");
  239. if (type === "schedule") {
  240. if (!this._scheduleCallbacks[channel]) return;
  241. const index = this._scheduleCallbacks[channel].findIndex(
  242. schedule => schedule.toString() === callback.toString()
  243. );
  244. if (index >= 0) this._scheduleCallbacks[channel].splice(index, 1);
  245. return;
  246. }
  247. if (!this._subscriptions[channel]) return;
  248. const index = this._subscriptions[channel].findIndex(
  249. subscription => subscription.toString() === callback.toString()
  250. );
  251. if (index < 0) return;
  252. this._subscriptions[channel].splice(index, 1);
  253. if (this._subscriptions[channel].length === 0)
  254. delete this._subscriptions[channel];
  255. }
  256. /**
  257. * schedule - Schedule a callback trigger
  258. */
  259. public async schedule(channel: string, time: number) {
  260. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  261. if (typeof time !== "number") throw new Error("Time must be a number");
  262. time = Math.round(time);
  263. if (time <= 0) throw new Error("Time must be greater than 0");
  264. await this._pubClient.set(this._createKey("schedule", channel), "", {
  265. PX: time,
  266. NX: true
  267. });
  268. }
  269. /**
  270. * unschedule - Unschedule a callback trigger
  271. */
  272. public async unschedule(channel: string) {
  273. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  274. await this._pubClient.del(this._createKey("schedule", channel));
  275. }
  276. public async subscribeSocket(channel: string, socketId: string) {
  277. if (!this._socketSubscriptions[channel]) {
  278. this._socketSubscriptions[channel] = new Set();
  279. }
  280. if (this._socketSubscriptions[channel].has(socketId)) return;
  281. this._socketSubscriptions[channel].add(socketId);
  282. }
  283. public async subscribeManySocket(channels: string[], socketId: string) {
  284. await forEachIn(channels, channel =>
  285. this.subscribeSocket(channel, socketId)
  286. );
  287. }
  288. public async unsubscribeSocket(channel: string, socketId: string) {
  289. if (
  290. !(
  291. this._socketSubscriptions[channel] &&
  292. this._socketSubscriptions[channel].has(socketId)
  293. )
  294. )
  295. return;
  296. this._socketSubscriptions[channel].delete(socketId);
  297. if (this._socketSubscriptions[channel].size === 0)
  298. delete this._socketSubscriptions[channel];
  299. }
  300. public async unsubscribeManySocket(channels: string[], socketId: string) {
  301. await forEachIn(channels, channel =>
  302. this.unsubscribeSocket(channel, socketId)
  303. );
  304. }
  305. public async unsubscribeAllSocket(socketId: string) {
  306. await forEachIn(
  307. Object.entries(this._socketSubscriptions).filter(([, socketIds]) =>
  308. socketIds.has(socketId)
  309. ),
  310. async ([channel]) => this.unsubscribeSocket(channel, socketId)
  311. );
  312. }
  313. /**
  314. * shutdown - Shutdown events module
  315. */
  316. public override async shutdown() {
  317. await super.shutdown();
  318. if (this._pubClient) await this._pubClient.quit();
  319. if (this._subClient) await this._subClient.quit();
  320. this._subscriptions = {};
  321. this._scheduleCallbacks = {};
  322. await this._stopped();
  323. }
  324. }
  325. export default new EventsModule();