EventsModule.ts 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. import {
  2. createClient,
  3. RedisClientOptions,
  4. RedisClientType,
  5. RedisDefaultModules,
  6. RedisFunctions,
  7. RedisModules,
  8. RedisScripts
  9. } from "redis";
  10. import config from "config";
  11. import { forEachIn } from "@common/utils/forEachIn";
  12. import BaseModule, { ModuleStatus } from "@/BaseModule";
  13. import WebSocketModule from "./WebSocketModule";
  14. import TestEvent from "@/modules/EventsModule/events/TestEvent";
  15. import Event from "@/modules/EventsModule/Event";
  16. import { EventDerived } from "@/types/EventDerived";
  17. import assertEventDerived from "@/utils/assertEventDerived";
  18. export class EventsModule extends BaseModule {
  19. private _pubClient?: RedisClientType<
  20. RedisDefaultModules & RedisModules,
  21. RedisFunctions,
  22. RedisScripts
  23. >;
  24. private _subClient?: RedisClientType<
  25. RedisDefaultModules & RedisModules,
  26. RedisFunctions,
  27. RedisScripts
  28. >;
  29. private _events: Record<string, typeof Event>;
  30. private _subscriptions: Record<string, ((message: any) => Promise<void>)[]>;
  31. private _pSubscriptions: Record<
  32. string,
  33. ((message: any) => Promise<void>)[]
  34. >;
  35. private _socketSubscriptions: Record<string, Set<string>>;
  36. private _scheduleCallbacks: Record<string, (() => Promise<void>)[]>;
  37. /**
  38. * Events Module
  39. */
  40. public constructor() {
  41. super("events");
  42. this._events = {
  43. test: TestEvent
  44. };
  45. this._subscriptions = {};
  46. this._pSubscriptions = {};
  47. this._socketSubscriptions = {};
  48. this._scheduleCallbacks = {};
  49. }
  50. /**
  51. * startup - Startup events module
  52. */
  53. public override async startup() {
  54. await super.startup();
  55. await this._createPubClient();
  56. await this._createSubClient();
  57. await super._started();
  58. }
  59. /**
  60. * createPubClient - Create redis client for publishing
  61. */
  62. private async _createPubClient() {
  63. this._pubClient = createClient({
  64. ...config.get<RedisClientOptions>("redis"),
  65. socket: {
  66. reconnectStrategy: (retries: number, error) => {
  67. if (
  68. retries >= 10 ||
  69. ![ModuleStatus.STARTING, ModuleStatus.STARTED].includes(
  70. this.getStatus()
  71. )
  72. )
  73. return false;
  74. this.log({
  75. type: "debug",
  76. message: `Redis reconnect attempt ${retries}`,
  77. data: error
  78. });
  79. return Math.min(retries * 50, 500);
  80. }
  81. }
  82. });
  83. this._pubClient.on("error", error => {
  84. this.log({ type: "error", message: error.message, data: error });
  85. this.setStatus(ModuleStatus.ERROR);
  86. });
  87. this._pubClient.on("ready", () => {
  88. this.log({ type: "debug", message: "Redis connection ready" });
  89. if (this.getStatus() === ModuleStatus.ERROR)
  90. this.setStatus(ModuleStatus.STARTED);
  91. });
  92. await this._pubClient.connect();
  93. const redisConfigResponse = await this._pubClient.sendCommand([
  94. "CONFIG",
  95. "GET",
  96. "notify-keyspace-events"
  97. ]);
  98. if (
  99. !(
  100. Array.isArray(redisConfigResponse) &&
  101. redisConfigResponse[1] === "xE"
  102. )
  103. )
  104. throw new Error(
  105. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  106. (Array.isArray(redisConfigResponse) &&
  107. redisConfigResponse[1]) ||
  108. "unknown"
  109. }`
  110. );
  111. }
  112. /**
  113. * createSubClient - Create redis client for subscribing
  114. */
  115. private async _createSubClient() {
  116. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  117. this._subClient = this._pubClient?.duplicate();
  118. this._subClient.on("error", error => {
  119. this.log({ type: "error", message: error.message, data: error });
  120. this.setStatus(ModuleStatus.ERROR);
  121. });
  122. this._subClient.on("ready", () => {
  123. this.log({ type: "debug", message: "Redis connection ready" });
  124. if (this.getStatus() === ModuleStatus.ERROR)
  125. this.setStatus(ModuleStatus.STARTED);
  126. });
  127. await this._subClient.connect();
  128. const { database = 0 } = this._subClient.options ?? {};
  129. await this._subClient.PSUBSCRIBE(
  130. `__keyevent@${database}__:expired`,
  131. async message => {
  132. const type = message.substring(0, message.indexOf("."));
  133. if (type !== "schedule") return;
  134. const channel = message.substring(message.indexOf(".") + 1);
  135. if (!this._scheduleCallbacks[channel]) return;
  136. await forEachIn(this._scheduleCallbacks[channel], callback =>
  137. callback()
  138. );
  139. }
  140. );
  141. }
  142. public getEvent(channel: string) {
  143. const EventClass = this._events[channel];
  144. if (!EventClass)
  145. throw new Error(`Event with channel "${channel}" not found.`);
  146. return EventClass;
  147. }
  148. /**
  149. * createKey - Create hex key
  150. */
  151. private _createKey(type: "event" | "schedule", channel: string) {
  152. if (!["event", "schedule"].includes(type))
  153. throw new Error("Invalid type");
  154. if (!channel || typeof channel !== "string")
  155. throw new Error("Invalid channel");
  156. return `${type}.${channel}`;
  157. }
  158. /**
  159. * publish - Publish an event
  160. */
  161. public async publish(event: typeof Event) {
  162. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  163. const channel = event.getChannel();
  164. const value = (event.constructor as EventDerived).makeMessage();
  165. await this._pubClient.publish(this._createKey("event", channel), value);
  166. }
  167. /**
  168. * subscriptionListener - Listener for event subscriptions
  169. */
  170. private async _subscriptionListener(message: string, key: string) {
  171. const type = key.substring(0, key.indexOf("."));
  172. if (type !== "event") return;
  173. const channel = key.substring(key.indexOf(".") + 1);
  174. const EventClass = this.getEvent(channel);
  175. const parsedMessage = EventClass.parseMessage(message);
  176. const event = new EventClass(parsedMessage);
  177. if (this._subscriptions && this._subscriptions[channel])
  178. await forEachIn(this._subscriptions[channel], async cb =>
  179. cb(event)
  180. );
  181. if (this._pSubscriptions)
  182. await forEachIn(
  183. Object.entries(this._pSubscriptions).filter(([subscription]) =>
  184. new RegExp(subscription).test(channel)
  185. ),
  186. async ([, callbacks]) =>
  187. forEachIn(callbacks, async cb => cb(event))
  188. );
  189. if (!this._socketSubscriptions[channel]) return;
  190. for await (const socketId of this._socketSubscriptions[
  191. channel
  192. ].values()) {
  193. await WebSocketModule.dispatch(socketId, channel, event.getData());
  194. }
  195. }
  196. /**
  197. * subscribe - Subscribe to an event or schedule completion
  198. */
  199. public async subscribe(
  200. type: "event" | "schedule",
  201. channel: string,
  202. callback: (message?: any) => Promise<void>
  203. ) {
  204. if (!this._subClient) throw new Error("Redis subClient unavailable.");
  205. if (type === "schedule") {
  206. if (!this._scheduleCallbacks[channel])
  207. this._scheduleCallbacks[channel] = [];
  208. this._scheduleCallbacks[channel].push(() => callback());
  209. return;
  210. }
  211. if (!this._subscriptions[channel]) {
  212. this._subscriptions[channel] = [];
  213. await this._subClient.subscribe(
  214. this._createKey(type, channel),
  215. (...args) => this._subscriptionListener(...args)
  216. );
  217. }
  218. this._subscriptions[channel].push(callback);
  219. }
  220. /**
  221. * pSubscribe - Subscribe to an event with pattern
  222. */
  223. public async pSubscribe(
  224. pattern: string,
  225. callback: (message?: any) => Promise<void>
  226. ) {
  227. if (!this._subClient) throw new Error("Redis subClient unavailable.");
  228. if (!this._pSubscriptions[pattern]) {
  229. this._pSubscriptions[pattern] = [];
  230. await this._subClient.pSubscribe(
  231. this._createKey("event", pattern),
  232. (...args) => this._subscriptionListener(...args)
  233. );
  234. }
  235. this._pSubscriptions[pattern].push(callback);
  236. }
  237. /**
  238. * unsubscribe - Unsubscribe from an event or schedule completion
  239. */
  240. public async unsubscribe(
  241. type: "event" | "schedule",
  242. channel: string,
  243. callback: (message?: any) => Promise<void>
  244. ) {
  245. if (!this._subClient) throw new Error("Redis subClient unavailable.");
  246. if (type === "schedule") {
  247. if (!this._scheduleCallbacks[channel]) return;
  248. const index = this._scheduleCallbacks[channel].findIndex(
  249. schedule => schedule.toString() === callback.toString()
  250. );
  251. if (index >= 0) this._scheduleCallbacks[channel].splice(index, 1);
  252. return;
  253. }
  254. if (!this._subscriptions[channel]) return;
  255. const index = this._subscriptions[channel].findIndex(
  256. subscription => subscription.toString() === callback.toString()
  257. );
  258. if (index < 0) return;
  259. this._subscriptions[channel].splice(index, 1);
  260. if (this._subscriptions[channel].length === 0) {
  261. delete this._subscriptions[channel];
  262. await this._subClient.unsubscribe(this._createKey(type, channel)); // TODO: Provide callback when unsubscribing
  263. }
  264. }
  265. /**
  266. * schedule - Schedule a callback trigger
  267. */
  268. public async schedule(channel: string, time: number) {
  269. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  270. if (typeof time !== "number") throw new Error("Time must be a number");
  271. time = Math.round(time);
  272. if (time <= 0) throw new Error("Time must be greater than 0");
  273. await this._pubClient.set(this._createKey("schedule", channel), "", {
  274. PX: time,
  275. NX: true
  276. });
  277. }
  278. /**
  279. * unschedule - Unschedule a callback trigger
  280. */
  281. public async unschedule(channel: string) {
  282. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  283. await this._pubClient.del(this._createKey("schedule", channel));
  284. }
  285. public async subscribeSocket(channel: string, socketId: string) {
  286. if (!this._socketSubscriptions[channel]) {
  287. await this.subscribe("event", channel, () => Promise.resolve());
  288. this._socketSubscriptions[channel] = new Set();
  289. }
  290. if (this._socketSubscriptions[channel].has(socketId)) return;
  291. this._socketSubscriptions[channel].add(socketId);
  292. }
  293. public async subscribeManySocket(channels: string[], socketId: string) {
  294. await forEachIn(channels, channel =>
  295. this.subscribeSocket(channel, socketId)
  296. );
  297. }
  298. public async unsubscribeSocket(channel: string, socketId: string) {
  299. if (
  300. !(
  301. this._socketSubscriptions[channel] &&
  302. this._socketSubscriptions[channel].has(socketId)
  303. )
  304. )
  305. return;
  306. this._socketSubscriptions[channel].delete(socketId);
  307. if (this._socketSubscriptions[channel].size === 0)
  308. delete this._socketSubscriptions[channel];
  309. }
  310. public async unsubscribeManySocket(channels: string[], socketId: string) {
  311. await forEachIn(channels, channel =>
  312. this.unsubscribeSocket(channel, socketId)
  313. );
  314. }
  315. public async unsubscribeAllSocket(socketId: string) {
  316. await forEachIn(
  317. Object.entries(this._socketSubscriptions).filter(([, socketIds]) =>
  318. socketIds.has(socketId)
  319. ),
  320. async ([channel]) => this.unsubscribeSocket(channel, socketId)
  321. );
  322. }
  323. /**
  324. * shutdown - Shutdown events module
  325. */
  326. public override async shutdown() {
  327. await super.shutdown();
  328. if (this._pubClient) await this._pubClient.quit();
  329. if (this._subClient) await this._subClient.quit();
  330. this._subscriptions = {};
  331. this._scheduleCallbacks = {};
  332. await this._stopped();
  333. }
  334. }
  335. export default new EventsModule();