123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- import config from "config";
- import redis from "redis";
- import mongoose from "mongoose";
- import CoreClass from "../../core";
- // Lightweight / convenience wrapper around redis module for our needs
- const pubs = {};
- const subs = {};
- let CacheModule;
- class _CacheModule extends CoreClass {
- // eslint-disable-next-line require-jsdoc
- constructor() {
- super("cache");
- CacheModule = this;
- }
- /**
- * Initialises the cache/redis module
- *
- * @returns {Promise} - returns promise (reject, resolve)
- */
- async initialize() {
- const importSchema = schemaName =>
- new Promise(resolve => {
- import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
- });
- this.schemas = {
- session: await importSchema("session"),
- station: await importSchema("station"),
- playlist: await importSchema("playlist"),
- officialPlaylist: await importSchema("officialPlaylist"),
- song: await importSchema("song"),
- punishment: await importSchema("punishment"),
- recentActivity: await importSchema("recentActivity")
- };
- return new Promise((resolve, reject) => {
- this.url = config.get("redis").url;
- this.password = config.get("redis").password;
- this.log("INFO", "Connecting...");
- this.client = redis.createClient({
- url: this.url,
- password: this.password,
- retry_strategy: options => {
- if (this.getStatus() === "LOCKDOWN") return;
- if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
- this.log("INFO", `Attempting to reconnect.`);
- if (options.attempt >= 10) {
- this.log("ERROR", `Stopped trying to reconnect.`);
- this.setStatus("FAILED");
- }
- }
- });
- this.client.on("error", err => {
- if (this.getStatus() === "INITIALIZING") reject(err);
- if (this.getStatus() === "LOCKDOWN") return;
- this.log("ERROR", `Error ${err.message}.`);
- });
- this.client.on("connect", () => {
- this.log("INFO", "Connected succesfully.");
- if (this.getStatus() === "INITIALIZING") resolve();
- else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
- });
- });
- }
- /**
- * Quits redis client
- *
- * @returns {Promise} - returns promise (reject, resolve)
- */
- QUIT() {
- return new Promise(resolve => {
- if (CacheModule.client.connected) {
- CacheModule.client.quit();
- Object.keys(pubs).forEach(channel => pubs[channel].quit());
- Object.keys(subs).forEach(channel => subs[channel].client.quit());
- }
- resolve();
- });
- }
- /**
- * Sets a single value in a table
- *
- * @param {object} payload - object containing payload
- * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
- * @param {string} payload.key - name of the key to set
- * @param {*} payload.value - the value we want to set
- * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
- * @returns {Promise} - returns a promise (resolve, reject)
- */
- HSET(payload) {
- return new Promise((resolve, reject) => {
- let { key } = payload;
- let { value } = payload;
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
- // automatically stringify objects and arrays into JSON
- if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
- CacheModule.client.hset(payload.table, key, value, err => {
- if (err) return reject(new Error(err));
- return resolve(JSON.parse(value));
- });
- });
- }
- /**
- * Gets a single value from a table
- *
- * @param {object} payload - object containing payload
- * @param {string} payload.table - name of the table to get the value from (table === redis hash)
- * @param {string} payload.key - name of the key to fetch
- * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
- * @returns {Promise} - returns a promise (resolve, reject)
- */
- HGET(payload) {
- return new Promise((resolve, reject) => {
- let { key } = payload;
- if (!key) {
- reject(new Error("Invalid key!"));
- return;
- }
- if (!payload.table) {
- reject(new Error("Invalid table!"));
- return;
- }
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
- CacheModule.client.hget(payload.table, key, (err, value) => {
- if (err) {
- reject(new Error(err));
- return;
- }
- try {
- value = JSON.parse(value);
- } catch (e) {
- reject(err);
- return;
- }
- resolve(value);
- });
- });
- }
- /**
- * Deletes a single value from a table
- *
- * @param {object} payload - object containing payload
- * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
- * @param {string} payload.key - name of the key to delete
- * @returns {Promise} - returns a promise (resolve, reject)
- */
- HDEL(payload) {
- return new Promise((resolve, reject) => {
- let { key } = payload;
- if (!payload.table) {
- reject(new Error("Invalid table!"));
- return;
- }
- if (!key) {
- reject(new Error("Invalid key!"));
- return;
- }
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
- CacheModule.client.hdel(payload.table, key, err => {
- if (err) {
- reject(new Error(err));
- return;
- }
- resolve();
- });
- });
- }
- /**
- * Returns all the keys for a table
- *
- * @param {object} payload - object containing payload
- * @param {string} payload.table - name of the table to get the values from (table === redis hash)
- * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
- * @returns {Promise} - returns a promise (resolve, reject)
- */
- HGETALL(payload) {
- return new Promise((resolve, reject) => {
- if (!payload.table) {
- reject(new Error("Invalid table!"));
- return;
- }
- CacheModule.client.hgetall(payload.table, (err, obj) => {
- if (err) {
- reject(new Error(err));
- return;
- }
- if (obj)
- Object.keys(obj).forEach(key => {
- obj[key] = JSON.parse(obj[key]);
- });
- else if (!obj) obj = [];
- resolve(obj);
- });
- });
- }
- /**
- * Publish a message to a channel, caches the redis client connection
- *
- * @param {object} payload - object containing payload
- * @param {string} payload.channel - the name of the channel we want to publish a message to
- * @param {*} payload.value - the value we want to send
- * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
- * @returns {Promise} - returns a promise (resolve, reject)
- */
- PUB(payload) {
- return new Promise((resolve, reject) => {
- let { value } = payload;
- if (!payload.channel) {
- reject(new Error("Invalid channel!"));
- return;
- }
- if (!value) {
- reject(new Error("Invalid value!"));
- return;
- }
- if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
- CacheModule.client.publish(payload.channel, value, err => {
- if (err) reject(err);
- else resolve();
- });
- });
- }
- /**
- * Subscribe to a channel, caches the redis client connection
- *
- * @param {object} payload - object containing payload
- * @param {string} payload.channel - name of the channel to subscribe to
- * @param {boolean} [payload.parseJson=true] - parse the message as JSON
- * @returns {Promise} - returns a promise (resolve, reject)
- */
- SUB(payload) {
- return new Promise((resolve, reject) => {
- if (!payload.channel) {
- reject(new Error("Invalid channel!"));
- return;
- }
- if (subs[payload.channel] === undefined) {
- subs[payload.channel] = {
- client: redis.createClient({
- url: CacheModule.url,
- password: CacheModule.password
- }),
- cbs: []
- };
- subs[payload.channel].client.on("message", (channel, message) => {
- if (message.startsWith("[") || message.startsWith("{"))
- try {
- message = JSON.parse(message);
- } catch (err) {
- console.error(err);
- }
- else if (message.startsWith('"') && message.endsWith('"'))
- message = message.substring(1).substring(0, message.length - 2);
- return subs[channel].cbs.forEach(cb => cb(message));
- });
- subs[payload.channel].client.subscribe(payload.channel);
- }
- subs[payload.channel].cbs.push(payload.cb);
- resolve();
- });
- }
- /**
- * Returns a redis schema
- *
- * @param {object} payload - object containing the payload
- * @param {string} payload.schemaName - the name of the schema to get
- * @returns {Promise} - returns promise (reject, resolve)
- */
- GET_SCHEMA(payload) {
- return new Promise(resolve => {
- resolve(CacheModule.schemas[payload.schemaName]);
- });
- }
- }
- export default new _CacheModule();
|