Browse Source

feat: Redis reconnection handling

Owen Diffey 1 year ago
parent
commit
b3ac47a22d
1 changed files with 47 additions and 2 deletions
  1. 47 2
      backend/src/modules/EventsModule.ts

+ 47 - 2
backend/src/modules/EventsModule.ts

@@ -1,7 +1,7 @@
 import { createClient, RedisClientType } from "redis";
 import config from "config";
 import crypto from "node:crypto";
-import BaseModule from "@/BaseModule";
+import BaseModule, { ModuleStatus } from "@/BaseModule";
 import { UniqueMethods } from "@/types/Modules";
 import JobContext from "@/JobContext";
 
@@ -41,7 +41,39 @@ export default class EventsModule extends BaseModule {
 	 * createPubClient - Create redis client for publishing
 	 */
 	private async _createPubClient() {
-		this._pubClient = createClient({ ...config.get("redis") });
+		this._pubClient = createClient({
+			...config.get("redis"),
+			reconnectStrategy: (retries: number, error) => {
+				if (
+					retries >= 10 ||
+					![ModuleStatus.STARTING, ModuleStatus.STARTED].includes(
+						this.getStatus()
+					)
+				)
+					return false;
+
+				this.log({
+					type: "debug",
+					message: `Redis reconnect attempt ${retries}`,
+					data: error
+				});
+
+				return Math.min(retries * 50, 500);
+			}
+		});
+
+		this._pubClient.on("error", error => {
+			this.log({ type: "error", message: error.message, data: error });
+
+			this.setStatus(ModuleStatus.ERROR);
+		});
+
+		this._pubClient.on("ready", () => {
+			this.log({ type: "debug", message: "Redis connection ready" });
+
+			if (this.getStatus() === ModuleStatus.ERROR)
+				this.setStatus(ModuleStatus.STARTED);
+		});
 
 		await this._pubClient.connect();
 
@@ -74,6 +106,19 @@ export default class EventsModule extends BaseModule {
 
 		this._subClient = this._pubClient?.duplicate();
 
+		this._subClient.on("error", error => {
+			this.log({ type: "error", message: error.message, data: error });
+
+			this.setStatus(ModuleStatus.ERROR);
+		});
+
+		this._subClient.on("ready", () => {
+			this.log({ type: "debug", message: "Redis connection ready" });
+
+			if (this.getStatus() === ModuleStatus.ERROR)
+				this.setStatus(ModuleStatus.STARTED);
+		});
+
 		await this._subClient.connect();
 
 		const { database = 0 } = this._subClient.options ?? {};