index.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584
  1. import async from "async";
  2. import config from "config";
  3. import redis from "redis";
  4. import mongoose from "mongoose";
  5. import CoreClass from "../../core";
  6. // Lightweight / convenience wrapper around redis module for our needs
  7. const pubs = {};
  8. const subs = {};
  9. let CacheModule;
  10. class _CacheModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("cache");
  14. CacheModule = this;
  15. }
  16. /**
  17. * Initialises the cache/redis module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. const importSchema = schemaName =>
  23. new Promise(resolve => {
  24. import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
  25. });
  26. this.schemas = {
  27. session: await importSchema("session"),
  28. station: await importSchema("station"),
  29. playlist: await importSchema("playlist"),
  30. officialPlaylist: await importSchema("officialPlaylist"),
  31. song: await importSchema("song"),
  32. punishment: await importSchema("punishment"),
  33. recentActivity: await importSchema("recentActivity"),
  34. ratings: await importSchema("ratings")
  35. };
  36. return new Promise((resolve, reject) => {
  37. this.url = config.get("redis").url;
  38. this.password = config.get("redis").password;
  39. this.log("INFO", "Connecting...");
  40. this.client = redis.createClient({
  41. url: this.url,
  42. password: this.password,
  43. reconnectStrategy: retries => {
  44. if (this.getStatus() !== "LOCKDOWN") {
  45. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  46. this.log("INFO", `Attempting to reconnect.`);
  47. if (retries >= 10) {
  48. this.log("ERROR", `Stopped trying to reconnect.`);
  49. this.setStatus("FAILED");
  50. new Error("Stopped trying to reconnect.");
  51. } else {
  52. Math.min(retries * 50, 500);
  53. }
  54. }
  55. }
  56. });
  57. this.client.on("error", err => {
  58. if (this.getStatus() === "INITIALIZING") reject(err);
  59. if (this.getStatus() === "LOCKDOWN") return;
  60. this.log("ERROR", `Error ${err.message}.`);
  61. });
  62. this.client.on("ready", () => {
  63. this.log("INFO", "Redis is ready.");
  64. if (this.getStatus() === "INITIALIZING") resolve();
  65. else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
  66. });
  67. this.client.connect().then(async () => {
  68. this.log("INFO", "Connected succesfully.");
  69. });
  70. // TODO move to a better place
  71. CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
  72. async.eachLimit(keys, 1, (key, next) => {
  73. CacheModule.runJob("DEL", { key }).finally(() => {
  74. next();
  75. });
  76. });
  77. });
  78. });
  79. }
  80. /**
  81. * Quits redis client
  82. *
  83. * @returns {Promise} - returns promise (reject, resolve)
  84. */
  85. QUIT() {
  86. return new Promise(resolve => {
  87. if (CacheModule.client.connected) {
  88. CacheModule.client.quit();
  89. Object.keys(pubs).forEach(channel => pubs[channel].quit());
  90. Object.keys(subs).forEach(channel => subs[channel].client.quit());
  91. }
  92. resolve();
  93. });
  94. }
  95. /**
  96. * Sets a single value
  97. *
  98. * @param {object} payload - object containing payload
  99. * @param {string} payload.key - name of the key to set
  100. * @param {*} payload.value - the value we want to set
  101. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  102. * @returns {Promise} - returns a promise (resolve, reject)
  103. */
  104. SET(payload) {
  105. return new Promise((resolve, reject) => {
  106. let { key } = payload;
  107. let { value } = payload;
  108. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  109. // automatically stringify objects and arrays into JSON
  110. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  111. CacheModule.client
  112. .SET(key, value)
  113. .then(() => {
  114. let parsed = value;
  115. try {
  116. parsed = JSON.parse(value);
  117. } catch {
  118. // Do nothing
  119. }
  120. resolve(parsed);
  121. })
  122. .catch(err => reject(new Error(err)));
  123. });
  124. }
  125. /**
  126. * Sets a single value in a table
  127. *
  128. * @param {object} payload - object containing payload
  129. * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
  130. * @param {string} payload.key - name of the key to set
  131. * @param {*} payload.value - the value we want to set
  132. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  133. * @returns {Promise} - returns a promise (resolve, reject)
  134. */
  135. HSET(payload) {
  136. return new Promise((resolve, reject) => {
  137. let { key } = payload;
  138. let { value } = payload;
  139. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  140. // automatically stringify objects and arrays into JSON
  141. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  142. CacheModule.client
  143. .HSET(payload.table, key, value)
  144. .then(() => {
  145. let parsed = value;
  146. try {
  147. parsed = JSON.parse(value);
  148. } catch {
  149. // Do nothing
  150. }
  151. resolve(parsed);
  152. })
  153. .catch(err => reject(new Error(err)));
  154. });
  155. }
  156. /**
  157. * Gets a single value
  158. *
  159. * @param {object} payload - object containing payload
  160. * @param {string} payload.key - name of the key to fetch
  161. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  162. * @returns {Promise} - returns a promise (resolve, reject)
  163. */
  164. GET(payload) {
  165. return new Promise((resolve, reject) => {
  166. let { key } = payload;
  167. if (!key) {
  168. reject(new Error("Invalid key!"));
  169. return;
  170. }
  171. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  172. CacheModule.client
  173. .GET(key, payload.value)
  174. .then(value => {
  175. if (value && !value.startsWith("{") && !value.startsWith("[")) return resolve(value);
  176. let parsedValue;
  177. try {
  178. parsedValue = JSON.parse(value);
  179. } catch (err) {
  180. return reject(err);
  181. }
  182. return resolve(parsedValue);
  183. })
  184. .catch(err => reject(new Error(err)));
  185. });
  186. }
  187. /**
  188. * Gets a single value from a table
  189. *
  190. * @param {object} payload - object containing payload
  191. * @param {string} payload.table - name of the table to get the value from (table === redis hash)
  192. * @param {string} payload.key - name of the key to fetch
  193. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  194. * @returns {Promise} - returns a promise (resolve, reject)
  195. */
  196. HGET(payload) {
  197. return new Promise((resolve, reject) => {
  198. let { key } = payload;
  199. if (!key) {
  200. reject(new Error("Invalid key!"));
  201. return;
  202. }
  203. if (!payload.table) {
  204. reject(new Error("Invalid table!"));
  205. return;
  206. }
  207. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  208. CacheModule.client
  209. .HGET(payload.table, key, payload.value)
  210. .then(value => {
  211. let parsedValue;
  212. try {
  213. parsedValue = JSON.parse(value);
  214. } catch (err) {
  215. return reject(err);
  216. }
  217. return resolve(parsedValue);
  218. })
  219. .catch(err => reject(new Error(err)));
  220. });
  221. }
  222. /**
  223. * Deletes a single value from a table
  224. *
  225. * @param {object} payload - object containing payload
  226. * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
  227. * @param {string} payload.key - name of the key to delete
  228. * @returns {Promise} - returns a promise (resolve, reject)
  229. */
  230. HDEL(payload) {
  231. return new Promise((resolve, reject) => {
  232. let { key } = payload;
  233. if (!payload.table) {
  234. reject(new Error("Invalid table!"));
  235. return;
  236. }
  237. if (!key) {
  238. reject(new Error("Invalid key!"));
  239. return;
  240. }
  241. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  242. CacheModule.client
  243. .HDEL(payload.table, key)
  244. .then(() => resolve())
  245. .catch(err => reject(new Error(err)));
  246. });
  247. }
  248. /**
  249. * Returns all the keys for a table
  250. *
  251. * @param {object} payload - object containing payload
  252. * @param {string} payload.table - name of the table to get the values from (table === redis hash)
  253. * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
  254. * @returns {Promise} - returns a promise (resolve, reject)
  255. */
  256. HGETALL(payload) {
  257. return new Promise((resolve, reject) => {
  258. if (!payload.table) {
  259. reject(new Error("Invalid table!"));
  260. return;
  261. }
  262. CacheModule.client
  263. .HGETALL(payload.table)
  264. .then(obj => {
  265. if (obj)
  266. Object.keys(obj).forEach(key => {
  267. obj[key] = JSON.parse(obj[key]);
  268. });
  269. else if (!obj) obj = [];
  270. resolve(obj);
  271. })
  272. .catch(err => reject(new Error(err)));
  273. });
  274. }
  275. /**
  276. * Deletes a single value
  277. *
  278. * @param {object} payload - object containing payload
  279. * @param {string} payload.key - name of the key to delete
  280. * @returns {Promise} - returns a promise (resolve, reject)
  281. */
  282. DEL(payload) {
  283. return new Promise((resolve, reject) => {
  284. let { key } = payload;
  285. if (!key) {
  286. reject(new Error("Invalid key!"));
  287. return;
  288. }
  289. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  290. CacheModule.client
  291. .DEL(key)
  292. .then(() => resolve())
  293. .catch(err => reject(new Error(err)));
  294. });
  295. }
  296. /**
  297. * Publish a message to a channel, caches the redis client connection
  298. *
  299. * @param {object} payload - object containing payload
  300. * @param {string} payload.channel - the name of the channel we want to publish a message to
  301. * @param {*} payload.value - the value we want to send
  302. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  303. * @returns {Promise} - returns a promise (resolve, reject)
  304. */
  305. PUB(payload) {
  306. return new Promise((resolve, reject) => {
  307. let { value } = payload;
  308. if (!payload.channel) {
  309. reject(new Error("Invalid channel!"));
  310. return;
  311. }
  312. if (!value) {
  313. reject(new Error("Invalid value!"));
  314. return;
  315. }
  316. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  317. CacheModule.client
  318. .publish(payload.channel, value)
  319. .then(() => resolve())
  320. .catch(err => reject(new Error(err)));
  321. });
  322. }
  323. /**
  324. * Subscribe to a channel, caches the redis client connection
  325. *
  326. * @param {object} payload - object containing payload
  327. * @param {string} payload.channel - name of the channel to subscribe to
  328. * @param {boolean} [payload.parseJson=true] - parse the message as JSON
  329. * @returns {Promise} - returns a promise (resolve, reject)
  330. */
  331. SUB(payload) {
  332. return new Promise((resolve, reject) => {
  333. if (!payload.channel) {
  334. reject(new Error("Invalid channel!"));
  335. return;
  336. }
  337. if (subs[payload.channel] === undefined) {
  338. subs[payload.channel] = {
  339. client: redis.createClient({
  340. url: CacheModule.url,
  341. password: CacheModule.password
  342. }),
  343. cbs: []
  344. };
  345. subs[payload.channel].client.connect().then(() => {
  346. subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {
  347. if (message.startsWith("[") || message.startsWith("{"))
  348. try {
  349. message = JSON.parse(message);
  350. } catch (err) {
  351. console.error(err);
  352. }
  353. else if (message.startsWith('"') && message.endsWith('"'))
  354. message = message.substring(1).substring(0, message.length - 2);
  355. subs[channel].cbs.forEach(cb => cb(message));
  356. });
  357. });
  358. }
  359. subs[payload.channel].cbs.push(payload.cb);
  360. resolve();
  361. });
  362. }
  363. /**
  364. * Gets a full list from Redis
  365. *
  366. * @param {object} payload - object containing payload
  367. * @param {string} payload.key - name of the table to get the value from (table === redis hash)
  368. * @returns {Promise} - returns a promise (resolve, reject)
  369. */
  370. LRANGE(payload) {
  371. return new Promise((resolve, reject) => {
  372. let { key } = payload;
  373. if (!key) {
  374. reject(new Error("Invalid key!"));
  375. return;
  376. }
  377. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  378. CacheModule.client
  379. .LRANGE(key, 0, -1)
  380. .then(list => resolve(list))
  381. .catch(err => reject(new Error(err)));
  382. });
  383. }
  384. /**
  385. * Adds a value to a list in Redis
  386. *
  387. * @param {object} payload - object containing payload
  388. * @param {string} payload.key - name of the list
  389. * @param {*} payload.value - the value we want to set
  390. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  391. * @returns {Promise} - returns a promise (resolve, reject)
  392. */
  393. RPUSH(payload) {
  394. return new Promise((resolve, reject) => {
  395. let { key, value } = payload;
  396. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  397. // automatically stringify objects and arrays into JSON
  398. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  399. CacheModule.client
  400. .RPUSH(key, value)
  401. .then(() => resolve())
  402. .catch(err => reject(new Error(err)));
  403. });
  404. }
  405. /**
  406. * Adds a value to a list in Redis using LPUSH
  407. *
  408. * @param {object} payload - object containing payload
  409. * @param {string} payload.key - name of the list
  410. * @param {*} payload.value - the value we want to set
  411. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  412. * @returns {Promise} - returns a promise (resolve, reject)
  413. */
  414. LPUSH(payload) {
  415. return new Promise((resolve, reject) => {
  416. let { key, value } = payload;
  417. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  418. // automatically stringify objects and arrays into JSON
  419. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  420. CacheModule.client
  421. .LPUSH(key, value)
  422. .then(() => resolve())
  423. .catch(err => reject(new Error(err)));
  424. });
  425. }
  426. /**
  427. * Gets the length of a Redis list
  428. *
  429. * @param {object} payload - object containing payload
  430. * @param {string} payload.key - name of the list
  431. * @returns {Promise} - returns a promise (resolve, reject)
  432. */
  433. LLEN(payload) {
  434. return new Promise((resolve, reject) => {
  435. const { key } = payload;
  436. CacheModule.client
  437. .LLEN(key)
  438. .then(len => resolve(len))
  439. .catch(err => reject(new Error(err)));
  440. });
  441. }
  442. /**
  443. * Removes an item from a list using RPOP
  444. *
  445. * @param {object} payload - object containing payload
  446. * @param {string} payload.key - name of the list
  447. * @returns {Promise} - returns a promise (resolve, reject)
  448. */
  449. RPOP(payload) {
  450. return new Promise((resolve, reject) => {
  451. const { key } = payload;
  452. CacheModule.client
  453. .RPOP(key)
  454. .then(() => resolve())
  455. .catch(err => reject(new Error(err)));
  456. });
  457. }
  458. /**
  459. * Removes a value from a list in Redis
  460. *
  461. * @param {object} payload - object containing payload
  462. * @param {string} payload.key - name of the list
  463. * @param {*} payload.value - the value we want to remove
  464. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  465. * @returns {Promise} - returns a promise (resolve, reject)
  466. */
  467. LREM(payload) {
  468. return new Promise((resolve, reject) => {
  469. let { key, value } = payload;
  470. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  471. // automatically stringify objects and arrays into JSON
  472. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  473. CacheModule.client
  474. .LREM(key, 1, value)
  475. .then(() => resolve())
  476. .catch(err => reject(new Error(err)));
  477. });
  478. }
  479. /**
  480. * Gets a list of keys in Redis with a matching pattern
  481. *
  482. * @param {object} payload - object containing payload
  483. * @param {string} payload.pattern - pattern to search for
  484. * @returns {Promise} - returns a promise (resolve, reject)
  485. */
  486. KEYS(payload) {
  487. return new Promise((resolve, reject) => {
  488. const { pattern } = payload;
  489. CacheModule.client
  490. .KEYS(pattern)
  491. .then(keys => resolve(keys))
  492. .catch(err => reject(new Error(err)));
  493. });
  494. }
  495. /**
  496. * Returns a redis schema
  497. *
  498. * @param {object} payload - object containing the payload
  499. * @param {string} payload.schemaName - the name of the schema to get
  500. * @returns {Promise} - returns promise (reject, resolve)
  501. */
  502. GET_SCHEMA(payload) {
  503. return new Promise(resolve => {
  504. resolve(CacheModule.schemas[payload.schemaName]);
  505. });
  506. }
  507. }
  508. export default new _CacheModule();