index.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. import async from "async";
  2. import config from "config";
  3. import redis from "redis";
  4. import mongoose from "mongoose";
  5. import CoreClass from "../../core";
  6. // Lightweight / convenience wrapper around redis module for our needs
  7. const pubs = {};
  8. const subs = {};
  9. let CacheModule;
  10. class _CacheModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("cache");
  14. CacheModule = this;
  15. }
  16. /**
  17. * Initialises the cache/redis module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. const importSchema = schemaName =>
  23. new Promise(resolve => {
  24. import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
  25. });
  26. this.schemas = {
  27. session: await importSchema("session"),
  28. station: await importSchema("station"),
  29. playlist: await importSchema("playlist"),
  30. officialPlaylist: await importSchema("officialPlaylist"),
  31. song: await importSchema("song"),
  32. punishment: await importSchema("punishment"),
  33. recentActivity: await importSchema("recentActivity"),
  34. ratings: await importSchema("ratings")
  35. };
  36. return new Promise((resolve, reject) => {
  37. this.log("INFO", "Connecting...");
  38. this.client = redis.createClient({
  39. ...config.get("redis"),
  40. reconnectStrategy: retries => {
  41. if (this.getStatus() !== "LOCKDOWN") {
  42. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  43. this.log("INFO", `Attempting to reconnect.`);
  44. if (retries >= 10) {
  45. this.log("ERROR", `Stopped trying to reconnect.`);
  46. this.setStatus("FAILED");
  47. new Error("Stopped trying to reconnect.");
  48. } else {
  49. Math.min(retries * 50, 500);
  50. }
  51. }
  52. }
  53. });
  54. this.client.on("error", err => {
  55. if (this.getStatus() === "INITIALIZING") reject(err);
  56. if (this.getStatus() === "LOCKDOWN") return;
  57. this.log("ERROR", `Error ${err.message}.`);
  58. });
  59. this.client.on("ready", () => {
  60. this.log("INFO", "Redis is ready.");
  61. if (this.getStatus() === "INITIALIZING") resolve();
  62. else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
  63. });
  64. this.client.connect().then(async () => {
  65. this.log("INFO", "Connected succesfully.");
  66. });
  67. // TODO move to a better place
  68. CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
  69. async.eachLimit(keys, 1, (key, next) => {
  70. CacheModule.runJob("DEL", { key }).finally(() => {
  71. next();
  72. });
  73. });
  74. });
  75. });
  76. }
  77. /**
  78. * Quits redis client
  79. *
  80. * @returns {Promise} - returns promise (reject, resolve)
  81. */
  82. QUIT() {
  83. return new Promise(resolve => {
  84. if (CacheModule.client.connected) {
  85. CacheModule.client.quit();
  86. Object.keys(pubs).forEach(channel => pubs[channel].quit());
  87. Object.keys(subs).forEach(channel => subs[channel].client.quit());
  88. }
  89. resolve();
  90. });
  91. }
  92. /**
  93. * Sets a single value
  94. *
  95. * @param {object} payload - object containing payload
  96. * @param {string} payload.key - name of the key to set
  97. * @param {*} payload.value - the value we want to set
  98. * @param {number} payload.ttl - ttl of the key in seconds
  99. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  100. * @returns {Promise} - returns a promise (resolve, reject)
  101. */
  102. SET(payload) {
  103. return new Promise((resolve, reject) => {
  104. let { key, value } = payload;
  105. const { ttl } = payload;
  106. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  107. // automatically stringify objects and arrays into JSON
  108. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  109. let options = null;
  110. if (ttl) {
  111. options = {
  112. EX: ttl
  113. };
  114. }
  115. CacheModule.client
  116. .SET(key, value, options)
  117. .then(() => {
  118. let parsed = value;
  119. try {
  120. parsed = JSON.parse(value);
  121. } catch {
  122. // Do nothing
  123. }
  124. resolve(parsed);
  125. })
  126. .catch(err => reject(new Error(err)));
  127. });
  128. }
  129. /**
  130. * Sets a single value in a table
  131. *
  132. * @param {object} payload - object containing payload
  133. * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
  134. * @param {string} payload.key - name of the key to set
  135. * @param {*} payload.value - the value we want to set
  136. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  137. * @returns {Promise} - returns a promise (resolve, reject)
  138. */
  139. HSET(payload) {
  140. return new Promise((resolve, reject) => {
  141. let { key } = payload;
  142. let { value } = payload;
  143. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  144. // automatically stringify objects and arrays into JSON
  145. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  146. CacheModule.client
  147. .HSET(payload.table, key, value)
  148. .then(() => resolve(JSON.parse(value)))
  149. .catch(err => reject(new Error(err)));
  150. });
  151. }
  152. /**
  153. * Gets a single value
  154. *
  155. * @param {object} payload - object containing payload
  156. * @param {string} payload.key - name of the key to fetch
  157. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  158. * @returns {Promise} - returns a promise (resolve, reject)
  159. */
  160. GET(payload) {
  161. return new Promise((resolve, reject) => {
  162. let { key } = payload;
  163. if (!key) {
  164. reject(new Error("Invalid key!"));
  165. return;
  166. }
  167. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  168. CacheModule.client
  169. .GET(key, payload.value)
  170. .then(value => {
  171. if (value && !value.startsWith("{") && !value.startsWith("[")) return resolve(value);
  172. let parsedValue;
  173. try {
  174. parsedValue = JSON.parse(value);
  175. } catch (err) {
  176. return reject(err);
  177. }
  178. return resolve(parsedValue);
  179. })
  180. .catch(err => reject(new Error(err)));
  181. });
  182. }
  183. /**
  184. * Gets a single value from a table
  185. *
  186. * @param {object} payload - object containing payload
  187. * @param {string} payload.table - name of the table to get the value from (table === redis hash)
  188. * @param {string} payload.key - name of the key to fetch
  189. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  190. * @returns {Promise} - returns a promise (resolve, reject)
  191. */
  192. HGET(payload) {
  193. return new Promise((resolve, reject) => {
  194. let { key } = payload;
  195. if (!key) {
  196. reject(new Error("Invalid key!"));
  197. return;
  198. }
  199. if (!payload.table) {
  200. reject(new Error("Invalid table!"));
  201. return;
  202. }
  203. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  204. CacheModule.client
  205. .HGET(payload.table, key, payload.value)
  206. .then(value => {
  207. let parsedValue;
  208. try {
  209. parsedValue = JSON.parse(value);
  210. } catch (err) {
  211. return reject(err);
  212. }
  213. return resolve(parsedValue);
  214. })
  215. .catch(err => reject(new Error(err)));
  216. });
  217. }
  218. /**
  219. * Deletes a single value from a table
  220. *
  221. * @param {object} payload - object containing payload
  222. * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
  223. * @param {string} payload.key - name of the key to delete
  224. * @returns {Promise} - returns a promise (resolve, reject)
  225. */
  226. HDEL(payload) {
  227. return new Promise((resolve, reject) => {
  228. let { key } = payload;
  229. if (!payload.table) {
  230. reject(new Error("Invalid table!"));
  231. return;
  232. }
  233. if (!key) {
  234. reject(new Error("Invalid key!"));
  235. return;
  236. }
  237. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  238. CacheModule.client
  239. .HDEL(payload.table, key)
  240. .then(() => resolve())
  241. .catch(err => reject(new Error(err)));
  242. });
  243. }
  244. /**
  245. * Returns all the keys for a table
  246. *
  247. * @param {object} payload - object containing payload
  248. * @param {string} payload.table - name of the table to get the values from (table === redis hash)
  249. * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
  250. * @returns {Promise} - returns a promise (resolve, reject)
  251. */
  252. HGETALL(payload) {
  253. return new Promise((resolve, reject) => {
  254. if (!payload.table) {
  255. reject(new Error("Invalid table!"));
  256. return;
  257. }
  258. CacheModule.client
  259. .HGETALL(payload.table)
  260. .then(obj => {
  261. if (obj)
  262. Object.keys(obj).forEach(key => {
  263. obj[key] = JSON.parse(obj[key]);
  264. });
  265. else if (!obj) obj = [];
  266. resolve(obj);
  267. })
  268. .catch(err => reject(new Error(err)));
  269. });
  270. }
  271. /**
  272. * Deletes a single value
  273. *
  274. * @param {object} payload - object containing payload
  275. * @param {string} payload.key - name of the key to delete
  276. * @returns {Promise} - returns a promise (resolve, reject)
  277. */
  278. DEL(payload) {
  279. return new Promise((resolve, reject) => {
  280. let { key } = payload;
  281. if (!key) {
  282. reject(new Error("Invalid key!"));
  283. return;
  284. }
  285. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  286. CacheModule.client
  287. .DEL(key)
  288. .then(() => resolve())
  289. .catch(err => reject(new Error(err)));
  290. });
  291. }
  292. /**
  293. * Publish a message to a channel, caches the redis client connection
  294. *
  295. * @param {object} payload - object containing payload
  296. * @param {string} payload.channel - the name of the channel we want to publish a message to
  297. * @param {*} payload.value - the value we want to send
  298. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  299. * @returns {Promise} - returns a promise (resolve, reject)
  300. */
  301. PUB(payload) {
  302. return new Promise((resolve, reject) => {
  303. let { value } = payload;
  304. if (!payload.channel) {
  305. reject(new Error("Invalid channel!"));
  306. return;
  307. }
  308. if (!value) {
  309. reject(new Error("Invalid value!"));
  310. return;
  311. }
  312. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  313. CacheModule.client
  314. .publish(payload.channel, value)
  315. .then(() => resolve())
  316. .catch(err => reject(new Error(err)));
  317. });
  318. }
  319. /**
  320. * Subscribe to a channel, caches the redis client connection
  321. *
  322. * @param {object} payload - object containing payload
  323. * @param {string} payload.channel - name of the channel to subscribe to
  324. * @param {boolean} [payload.parseJson=true] - parse the message as JSON
  325. * @returns {Promise} - returns a promise (resolve, reject)
  326. */
  327. SUB(payload) {
  328. return new Promise((resolve, reject) => {
  329. if (!payload.channel) {
  330. reject(new Error("Invalid channel!"));
  331. return;
  332. }
  333. if (subs[payload.channel] === undefined) {
  334. subs[payload.channel] = {
  335. client: redis.createClient(config.get("redis")),
  336. cbs: []
  337. };
  338. subs[payload.channel].client.connect().then(() => {
  339. subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {
  340. if (message.startsWith("[") || message.startsWith("{"))
  341. try {
  342. message = JSON.parse(message);
  343. } catch (err) {
  344. console.error(err);
  345. }
  346. else if (message.startsWith('"') && message.endsWith('"'))
  347. message = message.substring(1).substring(0, message.length - 2);
  348. subs[channel].cbs.forEach(cb => cb(message));
  349. });
  350. });
  351. }
  352. subs[payload.channel].cbs.push(payload.cb);
  353. resolve();
  354. });
  355. }
  356. /**
  357. * Gets a full list from Redis
  358. *
  359. * @param {object} payload - object containing payload
  360. * @param {string} payload.key - name of the table to get the value from (table === redis hash)
  361. * @returns {Promise} - returns a promise (resolve, reject)
  362. */
  363. LRANGE(payload) {
  364. return new Promise((resolve, reject) => {
  365. let { key } = payload;
  366. if (!key) {
  367. reject(new Error("Invalid key!"));
  368. return;
  369. }
  370. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  371. CacheModule.client
  372. .LRANGE(key, 0, -1)
  373. .then(list => resolve(list))
  374. .catch(err => reject(new Error(err)));
  375. });
  376. }
  377. /**
  378. * Adds a value to a list in Redis
  379. *
  380. * @param {object} payload - object containing payload
  381. * @param {string} payload.key - name of the list
  382. * @param {*} payload.value - the value we want to set
  383. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  384. * @returns {Promise} - returns a promise (resolve, reject)
  385. */
  386. RPUSH(payload) {
  387. return new Promise((resolve, reject) => {
  388. let { key, value } = payload;
  389. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  390. // automatically stringify objects and arrays into JSON
  391. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  392. CacheModule.client
  393. .RPUSH(key, value)
  394. .then(() => resolve())
  395. .catch(err => reject(new Error(err)));
  396. });
  397. }
  398. /**
  399. * Adds a value to a list in Redis using LPUSH
  400. *
  401. * @param {object} payload - object containing payload
  402. * @param {string} payload.key - name of the list
  403. * @param {*} payload.value - the value we want to set
  404. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  405. * @returns {Promise} - returns a promise (resolve, reject)
  406. */
  407. LPUSH(payload) {
  408. return new Promise((resolve, reject) => {
  409. let { key, value } = payload;
  410. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  411. // automatically stringify objects and arrays into JSON
  412. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  413. CacheModule.client
  414. .LPUSH(key, value)
  415. .then(() => resolve())
  416. .catch(err => reject(new Error(err)));
  417. });
  418. }
  419. /**
  420. * Gets the length of a Redis list
  421. *
  422. * @param {object} payload - object containing payload
  423. * @param {string} payload.key - name of the list
  424. * @returns {Promise} - returns a promise (resolve, reject)
  425. */
  426. LLEN(payload) {
  427. return new Promise((resolve, reject) => {
  428. const { key } = payload;
  429. CacheModule.client
  430. .LLEN(key)
  431. .then(len => resolve(len))
  432. .catch(err => reject(new Error(err)));
  433. });
  434. }
  435. /**
  436. * Removes an item from a list using RPOP
  437. *
  438. * @param {object} payload - object containing payload
  439. * @param {string} payload.key - name of the list
  440. * @returns {Promise} - returns a promise (resolve, reject)
  441. */
  442. RPOP(payload) {
  443. return new Promise((resolve, reject) => {
  444. const { key } = payload;
  445. CacheModule.client
  446. .RPOP(key)
  447. .then(() => resolve())
  448. .catch(err => reject(new Error(err)));
  449. });
  450. }
  451. /**
  452. * Removes a value from a list in Redis
  453. *
  454. * @param {object} payload - object containing payload
  455. * @param {string} payload.key - name of the list
  456. * @param {*} payload.value - the value we want to remove
  457. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  458. * @returns {Promise} - returns a promise (resolve, reject)
  459. */
  460. LREM(payload) {
  461. return new Promise((resolve, reject) => {
  462. let { key, value } = payload;
  463. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  464. // automatically stringify objects and arrays into JSON
  465. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  466. CacheModule.client
  467. .LREM(key, 1, value)
  468. .then(() => resolve())
  469. .catch(err => reject(new Error(err)));
  470. });
  471. }
  472. /**
  473. * Gets a list of keys in Redis with a matching pattern
  474. *
  475. * @param {object} payload - object containing payload
  476. * @param {string} payload.pattern - pattern to search for
  477. * @returns {Promise} - returns a promise (resolve, reject)
  478. */
  479. KEYS(payload) {
  480. return new Promise((resolve, reject) => {
  481. const { pattern } = payload;
  482. CacheModule.client
  483. .KEYS(pattern)
  484. .then(keys => resolve(keys))
  485. .catch(err => reject(new Error(err)));
  486. });
  487. }
  488. /**
  489. * Returns a redis schema
  490. *
  491. * @param {object} payload - object containing the payload
  492. * @param {string} payload.schemaName - the name of the schema to get
  493. * @returns {Promise} - returns promise (reject, resolve)
  494. */
  495. GET_SCHEMA(payload) {
  496. return new Promise(resolve => {
  497. resolve(CacheModule.schemas[payload.schemaName]);
  498. });
  499. }
  500. }
  501. export default new _CacheModule();