EventsModule.ts 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. import {
  2. createClient,
  3. RedisClientOptions,
  4. RedisClientType,
  5. RedisDefaultModules,
  6. RedisFunctions,
  7. RedisModules,
  8. RedisScripts
  9. } from "redis";
  10. import config from "config";
  11. import { forEachIn } from "@common/utils/forEachIn";
  12. import BaseModule, { ModuleStatus } from "@/BaseModule";
  13. import WebSocketModule from "./WebSocketModule";
  14. export class EventsModule extends BaseModule {
  15. private _pubClient?: RedisClientType<
  16. RedisDefaultModules & RedisModules,
  17. RedisFunctions,
  18. RedisScripts
  19. >;
  20. private _subClient?: RedisClientType<
  21. RedisDefaultModules & RedisModules,
  22. RedisFunctions,
  23. RedisScripts
  24. >;
  25. private _subscriptions: Record<string, ((message: any) => Promise<void>)[]>;
  26. private _pSubscriptions: Record<
  27. string,
  28. ((message: any) => Promise<void>)[]
  29. >;
  30. private _socketSubscriptions: Record<string, Set<string>>;
  31. private _scheduleCallbacks: Record<string, (() => Promise<void>)[]>;
  32. /**
  33. * Events Module
  34. */
  35. public constructor() {
  36. super("events");
  37. this._subscriptions = {};
  38. this._pSubscriptions = {};
  39. this._socketSubscriptions = {};
  40. this._scheduleCallbacks = {};
  41. }
  42. /**
  43. * startup - Startup events module
  44. */
  45. public override async startup() {
  46. await super.startup();
  47. await this._createPubClient();
  48. await this._createSubClient();
  49. await super._started();
  50. }
  51. /**
  52. * createPubClient - Create redis client for publishing
  53. */
  54. private async _createPubClient() {
  55. this._pubClient = createClient({
  56. ...config.get<RedisClientOptions>("redis"),
  57. socket: {
  58. reconnectStrategy: (retries: number, error) => {
  59. if (
  60. retries >= 10 ||
  61. ![ModuleStatus.STARTING, ModuleStatus.STARTED].includes(
  62. this.getStatus()
  63. )
  64. )
  65. return false;
  66. this.log({
  67. type: "debug",
  68. message: `Redis reconnect attempt ${retries}`,
  69. data: error
  70. });
  71. return Math.min(retries * 50, 500);
  72. }
  73. }
  74. });
  75. this._pubClient.on("error", error => {
  76. this.log({ type: "error", message: error.message, data: error });
  77. this.setStatus(ModuleStatus.ERROR);
  78. });
  79. this._pubClient.on("ready", () => {
  80. this.log({ type: "debug", message: "Redis connection ready" });
  81. if (this.getStatus() === ModuleStatus.ERROR)
  82. this.setStatus(ModuleStatus.STARTED);
  83. });
  84. await this._pubClient.connect();
  85. const redisConfigResponse = await this._pubClient.sendCommand([
  86. "CONFIG",
  87. "GET",
  88. "notify-keyspace-events"
  89. ]);
  90. if (
  91. !(
  92. Array.isArray(redisConfigResponse) &&
  93. redisConfigResponse[1] === "xE"
  94. )
  95. )
  96. throw new Error(
  97. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  98. (Array.isArray(redisConfigResponse) &&
  99. redisConfigResponse[1]) ||
  100. "unknown"
  101. }`
  102. );
  103. }
  104. /**
  105. * createSubClient - Create redis client for subscribing
  106. */
  107. private async _createSubClient() {
  108. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  109. this._subClient = this._pubClient?.duplicate();
  110. this._subClient.on("error", error => {
  111. this.log({ type: "error", message: error.message, data: error });
  112. this.setStatus(ModuleStatus.ERROR);
  113. });
  114. this._subClient.on("ready", () => {
  115. this.log({ type: "debug", message: "Redis connection ready" });
  116. if (this.getStatus() === ModuleStatus.ERROR)
  117. this.setStatus(ModuleStatus.STARTED);
  118. });
  119. await this._subClient.connect();
  120. const { database = 0 } = this._subClient.options ?? {};
  121. await this._subClient.PSUBSCRIBE(
  122. `__keyevent@${database}__:expired`,
  123. async message => {
  124. const type = message.substring(0, message.indexOf("."));
  125. if (type !== "schedule") return;
  126. const channel = message.substring(message.indexOf(".") + 1);
  127. if (!this._scheduleCallbacks[channel]) return;
  128. await forEachIn(this._scheduleCallbacks[channel], callback =>
  129. callback()
  130. );
  131. }
  132. );
  133. }
  134. /**
  135. * createKey - Create hex key
  136. */
  137. private _createKey(type: "event" | "schedule", channel: string) {
  138. if (!["event", "schedule"].includes(type))
  139. throw new Error("Invalid type");
  140. if (!channel || typeof channel !== "string")
  141. throw new Error("Invalid channel");
  142. return `${type}.${channel}`;
  143. }
  144. /**
  145. * publish - Publish an event
  146. */
  147. public async publish(channel: string, value: any) {
  148. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  149. if (!value) throw new Error("Invalid value");
  150. if (["object", "array"].includes(typeof value))
  151. value = JSON.stringify(value);
  152. await this._pubClient.publish(this._createKey("event", channel), value);
  153. }
  154. /**
  155. * subscriptionListener - Listener for event subscriptions
  156. */
  157. private async _subscriptionListener(message: string, key: string) {
  158. const type = key.substring(0, key.indexOf("."));
  159. if (type !== "event") return;
  160. const channel = key.substring(key.indexOf(".") + 1);
  161. if (message.startsWith("[") || message.startsWith("{"))
  162. try {
  163. message = JSON.parse(message);
  164. } catch (err) {
  165. console.error(err);
  166. }
  167. else if (message.startsWith('"') && message.endsWith('"'))
  168. message = message.substring(1).substring(0, message.length - 2);
  169. if (this._subscriptions && this._subscriptions[channel])
  170. await forEachIn(this._subscriptions[channel], async cb =>
  171. cb(message)
  172. );
  173. if (this._pSubscriptions)
  174. await forEachIn(
  175. Object.entries(this._pSubscriptions).filter(([subscription]) =>
  176. new RegExp(subscription).test(channel)
  177. ),
  178. async ([, callbacks]) =>
  179. forEachIn(callbacks, async cb => cb(message))
  180. );
  181. if (!this._socketSubscriptions[channel]) return;
  182. for await (const socketId of this._socketSubscriptions[
  183. channel
  184. ].values()) {
  185. await WebSocketModule.dispatch(socketId, channel, message);
  186. }
  187. }
  188. /**
  189. * subscribe - Subscribe to an event or schedule completion
  190. */
  191. public async subscribe(
  192. type: "event" | "schedule",
  193. channel: string,
  194. callback: (message?: any) => Promise<void>
  195. ) {
  196. if (!this._subClient) throw new Error("Redis subClient unavailable.");
  197. if (type === "schedule") {
  198. if (!this._scheduleCallbacks[channel])
  199. this._scheduleCallbacks[channel] = [];
  200. this._scheduleCallbacks[channel].push(() => callback());
  201. return;
  202. }
  203. if (!this._subscriptions[channel]) {
  204. this._subscriptions[channel] = [];
  205. await this._subClient.subscribe(
  206. this._createKey(type, channel),
  207. (...args) => this._subscriptionListener(...args)
  208. );
  209. }
  210. this._subscriptions[channel].push(callback);
  211. }
  212. /**
  213. * pSubscribe - Subscribe to an event with pattern
  214. */
  215. public async pSubscribe(
  216. pattern: string,
  217. callback: (message?: any) => Promise<void>
  218. ) {
  219. if (!this._subClient) throw new Error("Redis subClient unavailable.");
  220. if (!this._pSubscriptions[pattern]) {
  221. this._pSubscriptions[pattern] = [];
  222. await this._subClient.pSubscribe(
  223. this._createKey("event", pattern),
  224. (...args) => this._subscriptionListener(...args)
  225. );
  226. }
  227. this._pSubscriptions[pattern].push(callback);
  228. }
  229. /**
  230. * unsubscribe - Unsubscribe from an event or schedule completion
  231. */
  232. public async unsubscribe(
  233. type: "event" | "schedule",
  234. channel: string,
  235. callback: (message?: any) => Promise<void>
  236. ) {
  237. if (!this._subClient) throw new Error("Redis subClient unavailable.");
  238. if (type === "schedule") {
  239. if (!this._scheduleCallbacks[channel]) return;
  240. const index = this._scheduleCallbacks[channel].findIndex(
  241. schedule => schedule.toString() === callback.toString()
  242. );
  243. if (index >= 0) this._scheduleCallbacks[channel].splice(index, 1);
  244. return;
  245. }
  246. if (!this._subscriptions[channel]) return;
  247. const index = this._subscriptions[channel].findIndex(
  248. subscription => subscription.toString() === callback.toString()
  249. );
  250. if (index < 0) return;
  251. this._subscriptions[channel].splice(index, 1);
  252. if (this._subscriptions[channel].length === 0) {
  253. delete this._subscriptions[channel];
  254. await this._subClient.unsubscribe(this._createKey(type, channel)); // TODO: Provide callback when unsubscribing
  255. }
  256. }
  257. /**
  258. * schedule - Schedule a callback trigger
  259. */
  260. public async schedule(channel: string, time: number) {
  261. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  262. if (typeof time !== "number") throw new Error("Time must be a number");
  263. time = Math.round(time);
  264. if (time <= 0) throw new Error("Time must be greater than 0");
  265. await this._pubClient.set(this._createKey("schedule", channel), "", {
  266. PX: time,
  267. NX: true
  268. });
  269. }
  270. /**
  271. * unschedule - Unschedule a callback trigger
  272. */
  273. public async unschedule(channel: string) {
  274. if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
  275. await this._pubClient.del(this._createKey("schedule", channel));
  276. }
  277. public async subscribeSocket(channel: string, socketId: string) {
  278. if (!this._socketSubscriptions[channel]) {
  279. await this.subscribe("event", channel, () => Promise.resolve());
  280. this._socketSubscriptions[channel] = new Set();
  281. }
  282. if (this._socketSubscriptions[channel].has(socketId)) return;
  283. this._socketSubscriptions[channel].add(socketId);
  284. }
  285. public async subscribeManySocket(channels: string[], socketId: string) {
  286. await forEachIn(channels, channel =>
  287. this.subscribeSocket(channel, socketId)
  288. );
  289. }
  290. public async unsubscribeSocket(channel: string, socketId: string) {
  291. if (
  292. !(
  293. this._socketSubscriptions[channel] &&
  294. this._socketSubscriptions[channel].has(socketId)
  295. )
  296. )
  297. return;
  298. this._socketSubscriptions[channel].delete(socketId);
  299. if (this._socketSubscriptions[channel].size === 0)
  300. delete this._socketSubscriptions[channel];
  301. }
  302. public async unsubscribeManySocket(channels: string[], socketId: string) {
  303. await forEachIn(channels, channel =>
  304. this.unsubscribeSocket(channel, socketId)
  305. );
  306. }
  307. public async unsubscribeAllSocket(socketId: string) {
  308. await forEachIn(
  309. Object.entries(this._socketSubscriptions).filter(([, socketIds]) =>
  310. socketIds.has(socketId)
  311. ),
  312. async ([channel]) => this.unsubscribeSocket(channel, socketId)
  313. );
  314. }
  315. /**
  316. * shutdown - Shutdown events module
  317. */
  318. public override async shutdown() {
  319. await super.shutdown();
  320. if (this._pubClient) await this._pubClient.quit();
  321. if (this._subClient) await this._subClient.quit();
  322. this._subscriptions = {};
  323. this._scheduleCallbacks = {};
  324. await this._stopped();
  325. }
  326. }
  327. export default new EventsModule();