index.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. import async from "async";
  2. import config from "config";
  3. import redis from "redis";
  4. import mongoose from "mongoose";
  5. import CoreClass from "../../core";
  6. // Lightweight / convenience wrapper around redis module for our needs
  7. const pubs = {};
  8. const subs = {};
  9. let CacheModule;
  10. class _CacheModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("cache");
  14. CacheModule = this;
  15. }
  16. /**
  17. * Initialises the cache/redis module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. const importSchema = schemaName =>
  23. new Promise(resolve => {
  24. import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
  25. });
  26. this.schemas = {
  27. session: await importSchema("session"),
  28. station: await importSchema("station"),
  29. playlist: await importSchema("playlist"),
  30. officialPlaylist: await importSchema("officialPlaylist"),
  31. song: await importSchema("song"),
  32. punishment: await importSchema("punishment"),
  33. recentActivity: await importSchema("recentActivity"),
  34. ratings: await importSchema("ratings")
  35. };
  36. return new Promise((resolve, reject) => {
  37. this.url = config.get("redis").url;
  38. this.password = config.get("redis").password;
  39. this.log("INFO", "Connecting...");
  40. this.client = redis.createClient({
  41. url: this.url,
  42. password: this.password,
  43. reconnectStrategy: retries => {
  44. if (this.getStatus() !== "LOCKDOWN") {
  45. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  46. this.log("INFO", `Attempting to reconnect.`);
  47. if (retries >= 10) {
  48. this.log("ERROR", `Stopped trying to reconnect.`);
  49. this.setStatus("FAILED");
  50. new Error("Stopped trying to reconnect.");
  51. } else {
  52. Math.min(retries * 50, 500);
  53. }
  54. }
  55. }
  56. });
  57. this.client.on("error", err => {
  58. if (this.getStatus() === "INITIALIZING") reject(err);
  59. if (this.getStatus() === "LOCKDOWN") return;
  60. this.log("ERROR", `Error ${err.message}.`);
  61. });
  62. this.client.connect().then(async () => {
  63. this.log("INFO", "Connected succesfully.");
  64. if (this.getStatus() === "INITIALIZING") resolve();
  65. else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
  66. });
  67. // TODO move to a better place
  68. CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
  69. async.eachLimit(keys, 1, (key, next) => {
  70. CacheModule.runJob("DEL", { key }).finally(() => {
  71. next();
  72. });
  73. });
  74. });
  75. });
  76. }
  77. /**
  78. * Quits redis client
  79. *
  80. * @returns {Promise} - returns promise (reject, resolve)
  81. */
  82. QUIT() {
  83. return new Promise(resolve => {
  84. if (CacheModule.client.connected) {
  85. CacheModule.client.quit();
  86. Object.keys(pubs).forEach(channel => pubs[channel].quit());
  87. Object.keys(subs).forEach(channel => subs[channel].client.quit());
  88. }
  89. resolve();
  90. });
  91. }
  92. /**
  93. * Sets a single value in a table
  94. *
  95. * @param {object} payload - object containing payload
  96. * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
  97. * @param {string} payload.key - name of the key to set
  98. * @param {*} payload.value - the value we want to set
  99. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  100. * @returns {Promise} - returns a promise (resolve, reject)
  101. */
  102. HSET(payload) {
  103. return new Promise((resolve, reject) => {
  104. let { key } = payload;
  105. let { value } = payload;
  106. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  107. // automatically stringify objects and arrays into JSON
  108. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  109. CacheModule.client
  110. .HSET(payload.table, key, value)
  111. .then(() => resolve(JSON.parse(value)))
  112. .catch(err => reject(new Error(err)));
  113. });
  114. }
  115. /**
  116. * Gets a single value from a table
  117. *
  118. * @param {object} payload - object containing payload
  119. * @param {string} payload.table - name of the table to get the value from (table === redis hash)
  120. * @param {string} payload.key - name of the key to fetch
  121. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  122. * @returns {Promise} - returns a promise (resolve, reject)
  123. */
  124. HGET(payload) {
  125. return new Promise((resolve, reject) => {
  126. let { key } = payload;
  127. if (!key) {
  128. reject(new Error("Invalid key!"));
  129. return;
  130. }
  131. if (!payload.table) {
  132. reject(new Error("Invalid table!"));
  133. return;
  134. }
  135. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  136. CacheModule.client
  137. .HGET(payload.table, key, payload.value)
  138. .then(value => {
  139. let parsedValue;
  140. try {
  141. parsedValue = JSON.parse(value);
  142. } catch (err) {
  143. return reject(err);
  144. }
  145. return resolve(parsedValue);
  146. })
  147. .catch(err => reject(new Error(err)));
  148. });
  149. }
  150. /**
  151. * Deletes a single value from a table
  152. *
  153. * @param {object} payload - object containing payload
  154. * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
  155. * @param {string} payload.key - name of the key to delete
  156. * @returns {Promise} - returns a promise (resolve, reject)
  157. */
  158. HDEL(payload) {
  159. return new Promise((resolve, reject) => {
  160. let { key } = payload;
  161. if (!payload.table) {
  162. reject(new Error("Invalid table!"));
  163. return;
  164. }
  165. if (!key) {
  166. reject(new Error("Invalid key!"));
  167. return;
  168. }
  169. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  170. CacheModule.client
  171. .HDEL(payload.table, key)
  172. .then(() => resolve())
  173. .catch(err => reject(new Error(err)));
  174. });
  175. }
  176. /**
  177. * Returns all the keys for a table
  178. *
  179. * @param {object} payload - object containing payload
  180. * @param {string} payload.table - name of the table to get the values from (table === redis hash)
  181. * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
  182. * @returns {Promise} - returns a promise (resolve, reject)
  183. */
  184. HGETALL(payload) {
  185. return new Promise((resolve, reject) => {
  186. if (!payload.table) {
  187. reject(new Error("Invalid table!"));
  188. return;
  189. }
  190. CacheModule.client
  191. .HGETALL(payload.table)
  192. .then(obj => {
  193. if (obj)
  194. Object.keys(obj).forEach(key => {
  195. obj[key] = JSON.parse(obj[key]);
  196. });
  197. else if (!obj) obj = [];
  198. resolve(obj);
  199. })
  200. .catch(err => reject(new Error(err)));
  201. });
  202. }
  203. /**
  204. * Deletes a single value
  205. *
  206. * @param {object} payload - object containing payload
  207. * @param {string} payload.key - name of the key to delete
  208. * @returns {Promise} - returns a promise (resolve, reject)
  209. */
  210. DEL(payload) {
  211. return new Promise((resolve, reject) => {
  212. let { key } = payload;
  213. if (!key) {
  214. reject(new Error("Invalid key!"));
  215. return;
  216. }
  217. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  218. CacheModule.client
  219. .DEL(key)
  220. .then(() => resolve())
  221. .catch(err => reject(new Error(err)));
  222. });
  223. }
  224. /**
  225. * Publish a message to a channel, caches the redis client connection
  226. *
  227. * @param {object} payload - object containing payload
  228. * @param {string} payload.channel - the name of the channel we want to publish a message to
  229. * @param {*} payload.value - the value we want to send
  230. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  231. * @returns {Promise} - returns a promise (resolve, reject)
  232. */
  233. PUB(payload) {
  234. return new Promise((resolve, reject) => {
  235. let { value } = payload;
  236. if (!payload.channel) {
  237. reject(new Error("Invalid channel!"));
  238. return;
  239. }
  240. if (!value) {
  241. reject(new Error("Invalid value!"));
  242. return;
  243. }
  244. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  245. CacheModule.client
  246. .publish(payload.channel, value)
  247. .then(() => resolve())
  248. .catch(err => reject(new Error(err)));
  249. });
  250. }
  251. /**
  252. * Subscribe to a channel, caches the redis client connection
  253. *
  254. * @param {object} payload - object containing payload
  255. * @param {string} payload.channel - name of the channel to subscribe to
  256. * @param {boolean} [payload.parseJson=true] - parse the message as JSON
  257. * @returns {Promise} - returns a promise (resolve, reject)
  258. */
  259. SUB(payload) {
  260. return new Promise((resolve, reject) => {
  261. if (!payload.channel) {
  262. reject(new Error("Invalid channel!"));
  263. return;
  264. }
  265. if (subs[payload.channel] === undefined) {
  266. subs[payload.channel] = {
  267. client: redis.createClient({
  268. url: CacheModule.url,
  269. password: CacheModule.password
  270. }),
  271. cbs: []
  272. };
  273. subs[payload.channel].client.connect().then(() => {
  274. subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {
  275. if (message.startsWith("[") || message.startsWith("{"))
  276. try {
  277. message = JSON.parse(message);
  278. } catch (err) {
  279. console.error(err);
  280. }
  281. else if (message.startsWith('"') && message.endsWith('"'))
  282. message = message.substring(1).substring(0, message.length - 2);
  283. subs[channel].cbs.forEach(cb => cb(message));
  284. });
  285. });
  286. }
  287. subs[payload.channel].cbs.push(payload.cb);
  288. resolve();
  289. });
  290. }
  291. /**
  292. * Gets a full list from Redis
  293. *
  294. * @param {object} payload - object containing payload
  295. * @param {string} payload.key - name of the table to get the value from (table === redis hash)
  296. * @returns {Promise} - returns a promise (resolve, reject)
  297. */
  298. LRANGE(payload) {
  299. return new Promise((resolve, reject) => {
  300. let { key } = payload;
  301. if (!key) {
  302. reject(new Error("Invalid key!"));
  303. return;
  304. }
  305. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  306. CacheModule.client
  307. .LRANGE(key, 0, -1)
  308. .then(list => resolve(list))
  309. .catch(err => reject(new Error(err)));
  310. });
  311. }
  312. /**
  313. * Adds a value to a list in Redis
  314. *
  315. * @param {object} payload - object containing payload
  316. * @param {string} payload.key - name of the list
  317. * @param {*} payload.value - the value we want to set
  318. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  319. * @returns {Promise} - returns a promise (resolve, reject)
  320. */
  321. RPUSH(payload) {
  322. return new Promise((resolve, reject) => {
  323. let { key, value } = payload;
  324. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  325. // automatically stringify objects and arrays into JSON
  326. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  327. CacheModule.client
  328. .RPUSH(key, value)
  329. .then(() => resolve())
  330. .catch(err => reject(new Error(err)));
  331. });
  332. }
  333. /**
  334. * Removes a value from a list in Redis
  335. *
  336. * @param {object} payload - object containing payload
  337. * @param {string} payload.key - name of the list
  338. * @param {*} payload.value - the value we want to remove
  339. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  340. * @returns {Promise} - returns a promise (resolve, reject)
  341. */
  342. LREM(payload) {
  343. return new Promise((resolve, reject) => {
  344. let { key, value } = payload;
  345. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  346. // automatically stringify objects and arrays into JSON
  347. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  348. CacheModule.client
  349. .LREM(key, 1, value)
  350. .then(() => resolve())
  351. .catch(err => reject(new Error(err)));
  352. });
  353. }
  354. /**
  355. * Gets a list of keys in Redis with a matching pattern
  356. *
  357. * @param {object} payload - object containing payload
  358. * @param {string} payload.pattern - pattern to search for
  359. * @returns {Promise} - returns a promise (resolve, reject)
  360. */
  361. KEYS(payload) {
  362. return new Promise((resolve, reject) => {
  363. const { pattern } = payload;
  364. CacheModule.client
  365. .KEYS(pattern)
  366. .then(keys => resolve(keys))
  367. .catch(err => reject(new Error(err)));
  368. });
  369. }
  370. /**
  371. * Returns a redis schema
  372. *
  373. * @param {object} payload - object containing the payload
  374. * @param {string} payload.schemaName - the name of the schema to get
  375. * @returns {Promise} - returns promise (reject, resolve)
  376. */
  377. GET_SCHEMA(payload) {
  378. return new Promise(resolve => {
  379. resolve(CacheModule.schemas[payload.schemaName]);
  380. });
  381. }
  382. }
  383. export default new _CacheModule();