Browse Source

refactor: Support redis(node) v4.2.0

Owen Diffey 2 years ago
parent
commit
2ab89f71da
2 changed files with 180 additions and 202 deletions
  1. 88 100
      backend/logic/cache/index.js
  2. 92 102
      backend/logic/notifications.js

+ 88 - 100
backend/logic/cache/index.js

@@ -51,20 +51,31 @@ class _CacheModule extends CoreClass {
 			this.client = redis.createClient({
 				url: this.url,
 				password: this.password,
-				retry_strategy: options => {
-					if (this.getStatus() === "LOCKDOWN") return;
-					if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
+				reconnectStrategy: retries => {
+					if (this.getStatus() !== "LOCKDOWN") {
+						if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
 
-					this.log("INFO", `Attempting to reconnect.`);
+						this.log("INFO", `Attempting to reconnect.`);
 
-					if (options.attempt >= 10) {
-						this.log("ERROR", `Stopped trying to reconnect.`);
+						if (retries >= 10) {
+							this.log("ERROR", `Stopped trying to reconnect.`);
 
-						this.setStatus("FAILED");
+							this.setStatus("FAILED");
+							new Error("Stopped trying to reconnect.");
+						} else {
+							Math.min(retries * 50, 500);
+						}
 					}
 				}
 			});
 
+			this.client.connect().then(async () => {
+				this.log("INFO", "Connected succesfully.");
+
+				if (this.getStatus() === "INITIALIZING") resolve();
+				else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
+			});
+
 			// TODO move to a better place
 			CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
 				async.eachLimit(keys, 1, (key, next) => {
@@ -80,13 +91,6 @@ class _CacheModule extends CoreClass {
 
 				this.log("ERROR", `Error ${err.message}.`);
 			});
-
-			this.client.on("connect", () => {
-				this.log("INFO", "Connected succesfully.");
-
-				if (this.getStatus() === "INITIALIZING") resolve();
-				else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
-			});
 		});
 	}
 
@@ -125,10 +129,10 @@ class _CacheModule extends CoreClass {
 			// automatically stringify objects and arrays into JSON
 			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
 
-			CacheModule.client.hset(payload.table, key, value, err => {
-				if (err) return reject(new Error(err));
-				return resolve(JSON.parse(value));
-			});
+			CacheModule.client
+				.hSet(payload.table, key, value)
+				.then(() => resolve(JSON.parse(value)))
+				.catch(err => reject(new Error(err)));
 		});
 	}
 
@@ -155,20 +159,19 @@ class _CacheModule extends CoreClass {
 			}
 			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
 
-			CacheModule.client.hget(payload.table, key, (err, value) => {
-				if (err) {
-					reject(new Error(err));
-					return;
-				}
-				try {
-					value = JSON.parse(value);
-				} catch (e) {
-					reject(err);
-					return;
-				}
+			CacheModule.client
+				.hGet(payload.table, key, payload.value)
+				.then(value => {
+					let parsedValue;
+					try {
+						parsedValue = JSON.parse(value);
+					} catch (err) {
+						return reject(err);
+					}
 
-				resolve(value);
-			});
+					return resolve(parsedValue);
+				})
+				.catch(err => reject(new Error(err)));
 		});
 	}
 
@@ -195,13 +198,10 @@ class _CacheModule extends CoreClass {
 
 			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
 
-			CacheModule.client.hdel(payload.table, key, err => {
-				if (err) {
-					reject(new Error(err));
-					return;
-				}
-				resolve();
-			});
+			CacheModule.client
+				.hDel(payload.table, key)
+				.then(() => resolve())
+				.catch(err => reject(new Error(err)));
 		});
 	}
 
@@ -220,19 +220,17 @@ class _CacheModule extends CoreClass {
 				return;
 			}
 
-			CacheModule.client.hgetall(payload.table, (err, obj) => {
-				if (err) {
-					reject(new Error(err));
-					return;
-				}
-				if (obj)
-					Object.keys(obj).forEach(key => {
-						obj[key] = JSON.parse(obj[key]);
-					});
-				else if (!obj) obj = [];
-
-				resolve(obj);
-			});
+			CacheModule.client
+				.hGetAll(payload.table)
+				.then(obj => {
+					if (obj)
+						Object.keys(obj).forEach(key => {
+							obj[key] = JSON.parse(obj[key]);
+						});
+					else if (!obj) obj = [];
+					resolve(obj);
+				})
+				.catch(err => reject(new Error(err)));
 		});
 	}
 
@@ -254,13 +252,10 @@ class _CacheModule extends CoreClass {
 
 			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
 
-			CacheModule.client.del(key, err => {
-				if (err) {
-					reject(new Error(err));
-					return;
-				}
-				resolve();
-			});
+			CacheModule.client
+				.del(key)
+				.then(() => resolve())
+				.catch(err => reject(new Error(err)));
 		});
 	}
 
@@ -288,10 +283,10 @@ class _CacheModule extends CoreClass {
 
 			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
 
-			CacheModule.client.publish(payload.channel, value, err => {
-				if (err) reject(err);
-				else resolve();
-			});
+			CacheModule.client
+				.publish(payload.channel, value)
+				.then(() => resolve())
+				.catch(err => reject(new Error(err)));
 		});
 	}
 
@@ -318,21 +313,20 @@ class _CacheModule extends CoreClass {
 					}),
 					cbs: []
 				};
-
-				subs[payload.channel].client.on("message", (channel, message) => {
-					if (message.startsWith("[") || message.startsWith("{"))
-						try {
-							message = JSON.parse(message);
-						} catch (err) {
-							console.error(err);
-						}
-					else if (message.startsWith('"') && message.endsWith('"'))
-						message = message.substring(1).substring(0, message.length - 2);
-
-					return subs[channel].cbs.forEach(cb => cb(message));
+				subs[payload.channel].client.connect().then(() => {
+					subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {
+						if (message.startsWith("[") || message.startsWith("{"))
+							try {
+								message = JSON.parse(message);
+							} catch (err) {
+								console.error(err);
+							}
+						else if (message.startsWith('"') && message.endsWith('"'))
+							message = message.substring(1).substring(0, message.length - 2);
+
+						subs[channel].cbs.forEach(cb => cb(message));
+					});
 				});
-
-				subs[payload.channel].client.subscribe(payload.channel);
 			}
 
 			subs[payload.channel].cbs.push(payload.cb);
@@ -358,14 +352,10 @@ class _CacheModule extends CoreClass {
 			}
 			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
 
-			CacheModule.client.LRANGE(key, 0, -1, (err, list) => {
-				if (err) {
-					reject(new Error(err));
-					return;
-				}
-
-				resolve(list);
-			});
+			CacheModule.client
+				.LRANGE(key, 0, -1)
+				.then(list => resolve(list))
+				.catch(err => reject(new Error(err)));
 		});
 	}
 
@@ -380,17 +370,16 @@ class _CacheModule extends CoreClass {
 	 */
 	RPUSH(payload) {
 		return new Promise((resolve, reject) => {
-			let { key } = payload;
-			let { value } = payload;
+			let { key, value } = payload;
 
 			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
 			// automatically stringify objects and arrays into JSON
 			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
 
-			CacheModule.client.RPUSH(key, value, err => {
-				if (err) return reject(new Error(err));
-				return resolve();
-			});
+			CacheModule.client
+				.RPUSH(key, value)
+				.then(() => resolve())
+				.catch(err => reject(new Error(err)));
 		});
 	}
 
@@ -405,17 +394,16 @@ class _CacheModule extends CoreClass {
 	 */
 	LREM(payload) {
 		return new Promise((resolve, reject) => {
-			let { key } = payload;
-			let { value } = payload;
+			let { key, value } = payload;
 
 			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
 			// automatically stringify objects and arrays into JSON
 			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
 
-			CacheModule.client.LREM(key, 1, value, err => {
-				if (err) return reject(new Error(err));
-				return resolve();
-			});
+			CacheModule.client
+				.LREM(key, 1, value)
+				.then(() => resolve())
+				.catch(err => reject(new Error(err)));
 		});
 	}
 
@@ -430,10 +418,10 @@ class _CacheModule extends CoreClass {
 		return new Promise((resolve, reject) => {
 			const { pattern } = payload;
 
-			CacheModule.client.KEYS(pattern, (err, keys) => {
-				if (err) return reject(new Error(err));
-				return resolve(keys);
-			});
+			CacheModule.client
+				.KEYS(pattern)
+				.then(keys => resolve(keys))
+				.catch(err => reject(new Error(err)));
 		});
 	}
 

+ 92 - 102
backend/logic/notifications.js

@@ -6,7 +6,6 @@ import redis from "redis";
 import CoreClass from "../core";
 
 let NotificationsModule;
-let UtilsModule;
 
 class _NotificationsModule extends CoreClass {
 	// eslint-disable-next-line require-jsdoc
@@ -28,124 +27,119 @@ class _NotificationsModule extends CoreClass {
 			const url = (this.url = config.get("redis").url);
 			const password = (this.password = config.get("redis").password);
 
-			UtilsModule = this.moduleManager.modules.utils;
-
 			this.pub = redis.createClient({
 				url,
 				password,
-				retry_strategy: options => {
-					if (this.getStatus() === "LOCKDOWN") return;
-					if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
+				reconnectStrategy: retries => {
+					if (this.getStatus() !== "LOCKDOWN") {
+						if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
+
+						this.log("INFO", `Attempting to reconnect.`);
 
-					this.log("INFO", `Attempting to reconnect.`);
+						if (retries >= 10) {
+							this.log("ERROR", `Stopped trying to reconnect.`);
 
-					if (options.attempt >= 10) {
-						this.log("ERROR", `Stopped trying to reconnect.`);
+							this.setStatus("FAILED");
 
-						this.setStatus("FAILED");
+							new Error("Stopped trying to reconnect.");
+						} else {
+							Math.min(retries * 50, 500);
+						}
 					}
 				}
 			});
-			this.sub = redis.createClient({
-				url,
-				password,
-				retry_strategy: options => {
-					if (this.getStatus() === "LOCKDOWN") return;
-					if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
 
-					this.log("INFO", `Attempting to reconnect.`);
+			this.pub.connect().then(async () => {
+				this.log("INFO", "Pub connected succesfully.");
 
-					if (options.attempt >= 10) {
-						this.log("ERROR", `Stopped trying to reconnect.`);
+				this.pub
+					.sendCommand(["CONFIG", "GET", "notify-keyspace-events"])
+					.then(response => {
+						if (response[1] === "xE") {
+							this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
+							this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
+						} else {
+							this.log(
+								"ERROR",
+								"NOTIFICATIONS_INITIALIZE",
+								`notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
+							);
+							this.log(
+								"STATION_ISSUE",
+								`notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
+							);
+						}
+					})
+					.catch(err => {
+						this.log(
+							"ERROR",
+							"NOTIFICATIONS_INITIALIZE",
+							`Getting notify-keyspace-events gave an error. ${err}`
+						);
+						this.log("STATION_ISSUE", `Getting notify-keyspace-events gave an error. ${err}.`);
+					});
 
-						this.setStatus("FAILED");
-					}
-				}
+				if (this.getStatus() === "INITIALIZING") resolve();
+				else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
+					this.setStatus("INITIALIZED");
 			});
 
-			this.sub.on("error", err => {
-				if (this.getStatus() === "INITIALIZING") reject(err);
-				if (this.getStatus() === "LOCKDOWN") return;
+			this.sub = redis.createClient({
+				url,
+				password,
+				reconnectStrategy: retries => {
+					if (this.getStatus() !== "LOCKDOWN") {
+						if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
 
-				this.log("ERROR", `Error ${err.message}.`);
-			});
+						this.log("INFO", `Attempting to reconnect.`);
 
-			this.pub.on("error", err => {
-				if (this.getStatus() === "INITIALIZING") reject(err);
-				if (this.getStatus() === "LOCKDOWN") return;
+						if (retries >= 10) {
+							this.log("ERROR", `Stopped trying to reconnect.`);
 
-				this.log("ERROR", `Error ${err.message}.`);
+							this.setStatus("FAILED");
+
+							new Error("Stopped trying to reconnect.");
+						} else {
+							Math.min(retries * 50, 500);
+						}
+					}
+				}
 			});
 
-			this.sub.on("connect", () => {
+			this.sub.connect().then(async () => {
 				this.log("INFO", "Sub connected succesfully.");
 
 				if (this.getStatus() === "INITIALIZING") resolve();
 				else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
 					this.setStatus("READY");
-			});
 
-			this.pub.on("connect", () => {
-				this.log("INFO", "Pub connected succesfully.");
+				this.sub.pSubscribe(`__keyevent@${this.pub.options.db}__:expired`, (message, channel) => {
+					this.log("STATION_ISSUE", `PMESSAGE1 - Channel: ${channel}; ExpiredKey: ${message}`);
 
-				this.pub.config("GET", "notify-keyspace-events", async (err, response) => {
-					if (err) {
-						const formattedErr = await UtilsModule.runJob(
-							"GET_ERROR",
-							{
-								error: err
-							},
-							this
-						);
-						this.log(
-							"ERROR",
-							"NOTIFICATIONS_INITIALIZE",
-							`Getting notify-keyspace-events gave an error. ${formattedErr}`
-						);
+					this.subscriptions.forEach(sub => {
 						this.log(
 							"STATION_ISSUE",
-							`Getting notify-keyspace-events gave an error. ${formattedErr}. ${response}`
+							`PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(sub.name !== message)}`
 						);
-						return;
-					}
-					if (response[1] === "xE") {
-						this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
-						this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
-					} else {
-						this.log(
-							"ERROR",
-							"NOTIFICATIONS_INITIALIZE",
-							`notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
-						);
-						this.log(
-							"STATION_ISSUE",
-							`notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
-						);
-					}
+						if (sub.name !== message) return;
+						sub.cb();
+					});
 				});
-
-				if (this.getStatus() === "INITIALIZING") resolve();
-				else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
-					this.setStatus("INITIALIZED");
 			});
 
-			this.sub.on("pmessage", (pattern, channel, expiredKey) => {
-				this.log(
-					"STATION_ISSUE",
-					`PMESSAGE1 - Pattern: ${pattern}; Channel: ${channel}; ExpiredKey: ${expiredKey}`
-				);
+			this.sub.on("error", err => {
+				if (this.getStatus() === "INITIALIZING") reject(err);
+				if (this.getStatus() === "LOCKDOWN") return;
 
-				this.subscriptions.forEach(sub => {
-					this.log(
-						"STATION_ISSUE",
-						`PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(sub.name !== expiredKey)}`
-					);
-					if (sub.name !== expiredKey) return;
-					sub.cb();
-				});
+				this.log("ERROR", `Error ${err.message}.`);
 			});
 
-			this.sub.psubscribe(`__keyevent@${this.pub.options.db}__:expired`);
+			this.pub.on("error", err => {
+				if (this.getStatus() === "INITIALIZING") reject(err);
+				if (this.getStatus() === "LOCKDOWN") return;
+
+				this.log("ERROR", `Error ${err.message}.`);
+			});
 		});
 	}
 
@@ -172,17 +166,16 @@ class _NotificationsModule extends CoreClass {
 						.update(`_notification:${payload.name}_`)
 						.digest("hex")}; StationId: ${payload.station._id}; StationName: ${payload.station.name}`
 				);
-				NotificationsModule.pub.set(
-					crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
-					"",
-					"PX",
-					time,
-					"NX",
-					err => {
-						if (err) reject(err);
-						else resolve();
-					}
-				);
+				NotificationsModule.pub
+					.set(
+						crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
+						"",
+						"PX",
+						time,
+						"NX"
+					)
+					.then(() => resolve())
+					.catch(err => reject(new Error(err)));
 			}
 		});
 	}
@@ -266,13 +259,10 @@ class _NotificationsModule extends CoreClass {
 					.update(`_notification:${payload.name}_`)
 					.digest("hex")}`
 			);
-			NotificationsModule.pub.del(
-				crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
-				err => {
-					if (err) reject(err);
-					else resolve();
-				}
-			);
+			NotificationsModule.pub
+				.del(crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"))
+				.then(() => resolve())
+				.catch(err => reject(new Error(err)));
 		});
 	}
 }