index.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. import async from "async";
  2. import config from "config";
  3. import redis from "redis";
  4. import mongoose from "mongoose";
  5. import CoreClass from "../../core";
  6. // Lightweight / convenience wrapper around redis module for our needs
  7. const pubs = {};
  8. const subs = {};
  9. let CacheModule;
  10. class _CacheModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("cache");
  14. CacheModule = this;
  15. }
  16. /**
  17. * Initialises the cache/redis module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. const importSchema = schemaName =>
  23. new Promise(resolve => {
  24. import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
  25. });
  26. this.schemas = {
  27. session: await importSchema("session"),
  28. station: await importSchema("station"),
  29. playlist: await importSchema("playlist"),
  30. officialPlaylist: await importSchema("officialPlaylist"),
  31. song: await importSchema("song"),
  32. punishment: await importSchema("punishment"),
  33. recentActivity: await importSchema("recentActivity"),
  34. ratings: await importSchema("ratings")
  35. };
  36. return new Promise((resolve, reject) => {
  37. this.url = config.get("redis").url;
  38. this.password = config.get("redis").password;
  39. this.log("INFO", "Connecting...");
  40. this.client = redis.createClient({
  41. url: this.url,
  42. password: this.password,
  43. reconnectStrategy: retries => {
  44. if (this.getStatus() !== "LOCKDOWN") {
  45. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  46. this.log("INFO", `Attempting to reconnect.`);
  47. if (retries >= 10) {
  48. this.log("ERROR", `Stopped trying to reconnect.`);
  49. this.setStatus("FAILED");
  50. new Error("Stopped trying to reconnect.");
  51. } else {
  52. Math.min(retries * 50, 500);
  53. }
  54. }
  55. }
  56. });
  57. this.client.on("error", err => {
  58. if (this.getStatus() === "INITIALIZING") reject(err);
  59. if (this.getStatus() === "LOCKDOWN") return;
  60. this.log("ERROR", `Error ${err.message}.`);
  61. });
  62. this.client.on("ready", () => {
  63. this.log("INFO", "Redis is ready.");
  64. if (this.getStatus() === "INITIALIZING") resolve();
  65. else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
  66. });
  67. this.client.connect().then(async () => {
  68. this.log("INFO", "Connected succesfully.");
  69. });
  70. // TODO move to a better place
  71. CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
  72. async.eachLimit(keys, 1, (key, next) => {
  73. CacheModule.runJob("DEL", { key }).finally(() => {
  74. next();
  75. });
  76. });
  77. });
  78. });
  79. }
  80. /**
  81. * Quits redis client
  82. *
  83. * @returns {Promise} - returns promise (reject, resolve)
  84. */
  85. QUIT() {
  86. return new Promise(resolve => {
  87. if (CacheModule.client.connected) {
  88. CacheModule.client.quit();
  89. Object.keys(pubs).forEach(channel => pubs[channel].quit());
  90. Object.keys(subs).forEach(channel => subs[channel].client.quit());
  91. }
  92. resolve();
  93. });
  94. }
  95. /**
  96. * Sets a single value
  97. *
  98. * @param {object} payload - object containing payload
  99. * @param {string} payload.key - name of the key to set
  100. * @param {*} payload.value - the value we want to set
  101. * @param {number} payload.ttl - ttl of the key in seconds
  102. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  103. * @returns {Promise} - returns a promise (resolve, reject)
  104. */
  105. SET(payload) {
  106. return new Promise((resolve, reject) => {
  107. let { key, value } = payload;
  108. const { ttl } = payload;
  109. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  110. // automatically stringify objects and arrays into JSON
  111. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  112. let options = null;
  113. if (ttl) {
  114. options = {
  115. EX: ttl
  116. };
  117. }
  118. CacheModule.client
  119. .SET(key, value, options)
  120. .then(() => {
  121. let parsed = value;
  122. try {
  123. parsed = JSON.parse(value);
  124. } catch {
  125. // Do nothing
  126. }
  127. resolve(parsed);
  128. })
  129. .catch(err => reject(new Error(err)));
  130. });
  131. }
  132. /**
  133. * Sets a single value in a table
  134. *
  135. * @param {object} payload - object containing payload
  136. * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
  137. * @param {string} payload.key - name of the key to set
  138. * @param {*} payload.value - the value we want to set
  139. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  140. * @returns {Promise} - returns a promise (resolve, reject)
  141. */
  142. HSET(payload) {
  143. return new Promise((resolve, reject) => {
  144. let { key } = payload;
  145. let { value } = payload;
  146. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  147. // automatically stringify objects and arrays into JSON
  148. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  149. CacheModule.client
  150. .HSET(payload.table, key, value)
  151. .then(() => {
  152. let parsed = value;
  153. try {
  154. parsed = JSON.parse(value);
  155. } catch {
  156. // Do nothing
  157. }
  158. resolve(parsed);
  159. })
  160. .catch(err => reject(new Error(err)));
  161. });
  162. }
  163. /**
  164. * Gets a single value
  165. *
  166. * @param {object} payload - object containing payload
  167. * @param {string} payload.key - name of the key to fetch
  168. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  169. * @returns {Promise} - returns a promise (resolve, reject)
  170. */
  171. GET(payload) {
  172. return new Promise((resolve, reject) => {
  173. let { key } = payload;
  174. if (!key) {
  175. reject(new Error("Invalid key!"));
  176. return;
  177. }
  178. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  179. CacheModule.client
  180. .GET(key, payload.value)
  181. .then(value => {
  182. if (value && !value.startsWith("{") && !value.startsWith("[")) return resolve(value);
  183. let parsedValue;
  184. try {
  185. parsedValue = JSON.parse(value);
  186. } catch (err) {
  187. return reject(err);
  188. }
  189. return resolve(parsedValue);
  190. })
  191. .catch(err => reject(new Error(err)));
  192. });
  193. }
  194. /**
  195. * Gets a single value from a table
  196. *
  197. * @param {object} payload - object containing payload
  198. * @param {string} payload.table - name of the table to get the value from (table === redis hash)
  199. * @param {string} payload.key - name of the key to fetch
  200. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  201. * @returns {Promise} - returns a promise (resolve, reject)
  202. */
  203. HGET(payload) {
  204. return new Promise((resolve, reject) => {
  205. let { key } = payload;
  206. if (!key) {
  207. reject(new Error("Invalid key!"));
  208. return;
  209. }
  210. if (!payload.table) {
  211. reject(new Error("Invalid table!"));
  212. return;
  213. }
  214. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  215. CacheModule.client
  216. .HGET(payload.table, key, payload.value)
  217. .then(value => {
  218. let parsedValue;
  219. try {
  220. parsedValue = JSON.parse(value);
  221. } catch (err) {
  222. return reject(err);
  223. }
  224. return resolve(parsedValue);
  225. })
  226. .catch(err => reject(new Error(err)));
  227. });
  228. }
  229. /**
  230. * Deletes a single value from a table
  231. *
  232. * @param {object} payload - object containing payload
  233. * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
  234. * @param {string} payload.key - name of the key to delete
  235. * @returns {Promise} - returns a promise (resolve, reject)
  236. */
  237. HDEL(payload) {
  238. return new Promise((resolve, reject) => {
  239. let { key } = payload;
  240. if (!payload.table) {
  241. reject(new Error("Invalid table!"));
  242. return;
  243. }
  244. if (!key) {
  245. reject(new Error("Invalid key!"));
  246. return;
  247. }
  248. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  249. CacheModule.client
  250. .HDEL(payload.table, key)
  251. .then(() => resolve())
  252. .catch(err => reject(new Error(err)));
  253. });
  254. }
  255. /**
  256. * Returns all the keys for a table
  257. *
  258. * @param {object} payload - object containing payload
  259. * @param {string} payload.table - name of the table to get the values from (table === redis hash)
  260. * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
  261. * @returns {Promise} - returns a promise (resolve, reject)
  262. */
  263. HGETALL(payload) {
  264. return new Promise((resolve, reject) => {
  265. if (!payload.table) {
  266. reject(new Error("Invalid table!"));
  267. return;
  268. }
  269. CacheModule.client
  270. .HGETALL(payload.table)
  271. .then(obj => {
  272. if (obj)
  273. Object.keys(obj).forEach(key => {
  274. obj[key] = JSON.parse(obj[key]);
  275. });
  276. else if (!obj) obj = [];
  277. resolve(obj);
  278. })
  279. .catch(err => reject(new Error(err)));
  280. });
  281. }
  282. /**
  283. * Deletes a single value
  284. *
  285. * @param {object} payload - object containing payload
  286. * @param {string} payload.key - name of the key to delete
  287. * @returns {Promise} - returns a promise (resolve, reject)
  288. */
  289. DEL(payload) {
  290. return new Promise((resolve, reject) => {
  291. let { key } = payload;
  292. if (!key) {
  293. reject(new Error("Invalid key!"));
  294. return;
  295. }
  296. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  297. CacheModule.client
  298. .DEL(key)
  299. .then(() => resolve())
  300. .catch(err => reject(new Error(err)));
  301. });
  302. }
  303. /**
  304. * Publish a message to a channel, caches the redis client connection
  305. *
  306. * @param {object} payload - object containing payload
  307. * @param {string} payload.channel - the name of the channel we want to publish a message to
  308. * @param {*} payload.value - the value we want to send
  309. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  310. * @returns {Promise} - returns a promise (resolve, reject)
  311. */
  312. PUB(payload) {
  313. return new Promise((resolve, reject) => {
  314. let { value } = payload;
  315. if (!payload.channel) {
  316. reject(new Error("Invalid channel!"));
  317. return;
  318. }
  319. if (!value) {
  320. reject(new Error("Invalid value!"));
  321. return;
  322. }
  323. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  324. CacheModule.client
  325. .publish(payload.channel, value)
  326. .then(() => resolve())
  327. .catch(err => reject(new Error(err)));
  328. });
  329. }
  330. /**
  331. * Subscribe to a channel, caches the redis client connection
  332. *
  333. * @param {object} payload - object containing payload
  334. * @param {string} payload.channel - name of the channel to subscribe to
  335. * @param {boolean} [payload.parseJson=true] - parse the message as JSON
  336. * @returns {Promise} - returns a promise (resolve, reject)
  337. */
  338. SUB(payload) {
  339. return new Promise((resolve, reject) => {
  340. if (!payload.channel) {
  341. reject(new Error("Invalid channel!"));
  342. return;
  343. }
  344. if (subs[payload.channel] === undefined) {
  345. subs[payload.channel] = {
  346. client: redis.createClient({
  347. url: CacheModule.url,
  348. password: CacheModule.password
  349. }),
  350. cbs: []
  351. };
  352. subs[payload.channel].client.connect().then(() => {
  353. subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {
  354. if (message.startsWith("[") || message.startsWith("{"))
  355. try {
  356. message = JSON.parse(message);
  357. } catch (err) {
  358. console.error(err);
  359. }
  360. else if (message.startsWith('"') && message.endsWith('"'))
  361. message = message.substring(1).substring(0, message.length - 2);
  362. subs[channel].cbs.forEach(cb => cb(message));
  363. });
  364. });
  365. }
  366. subs[payload.channel].cbs.push(payload.cb);
  367. resolve();
  368. });
  369. }
  370. /**
  371. * Gets a full list from Redis
  372. *
  373. * @param {object} payload - object containing payload
  374. * @param {string} payload.key - name of the table to get the value from (table === redis hash)
  375. * @returns {Promise} - returns a promise (resolve, reject)
  376. */
  377. LRANGE(payload) {
  378. return new Promise((resolve, reject) => {
  379. let { key } = payload;
  380. if (!key) {
  381. reject(new Error("Invalid key!"));
  382. return;
  383. }
  384. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  385. CacheModule.client
  386. .LRANGE(key, 0, -1)
  387. .then(list => resolve(list))
  388. .catch(err => reject(new Error(err)));
  389. });
  390. }
  391. /**
  392. * Adds a value to a list in Redis
  393. *
  394. * @param {object} payload - object containing payload
  395. * @param {string} payload.key - name of the list
  396. * @param {*} payload.value - the value we want to set
  397. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  398. * @returns {Promise} - returns a promise (resolve, reject)
  399. */
  400. RPUSH(payload) {
  401. return new Promise((resolve, reject) => {
  402. let { key, value } = payload;
  403. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  404. // automatically stringify objects and arrays into JSON
  405. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  406. CacheModule.client
  407. .RPUSH(key, value)
  408. .then(() => resolve())
  409. .catch(err => reject(new Error(err)));
  410. });
  411. }
  412. /**
  413. * Adds a value to a list in Redis using LPUSH
  414. *
  415. * @param {object} payload - object containing payload
  416. * @param {string} payload.key - name of the list
  417. * @param {*} payload.value - the value we want to set
  418. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  419. * @returns {Promise} - returns a promise (resolve, reject)
  420. */
  421. LPUSH(payload) {
  422. return new Promise((resolve, reject) => {
  423. let { key, value } = payload;
  424. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  425. // automatically stringify objects and arrays into JSON
  426. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  427. CacheModule.client
  428. .LPUSH(key, value)
  429. .then(() => resolve())
  430. .catch(err => reject(new Error(err)));
  431. });
  432. }
  433. /**
  434. * Gets the length of a Redis list
  435. *
  436. * @param {object} payload - object containing payload
  437. * @param {string} payload.key - name of the list
  438. * @returns {Promise} - returns a promise (resolve, reject)
  439. */
  440. LLEN(payload) {
  441. return new Promise((resolve, reject) => {
  442. const { key } = payload;
  443. CacheModule.client
  444. .LLEN(key)
  445. .then(len => resolve(len))
  446. .catch(err => reject(new Error(err)));
  447. });
  448. }
  449. /**
  450. * Removes an item from a list using RPOP
  451. *
  452. * @param {object} payload - object containing payload
  453. * @param {string} payload.key - name of the list
  454. * @returns {Promise} - returns a promise (resolve, reject)
  455. */
  456. RPOP(payload) {
  457. return new Promise((resolve, reject) => {
  458. const { key } = payload;
  459. CacheModule.client
  460. .RPOP(key)
  461. .then(() => resolve())
  462. .catch(err => reject(new Error(err)));
  463. });
  464. }
  465. /**
  466. * Removes a value from a list in Redis
  467. *
  468. * @param {object} payload - object containing payload
  469. * @param {string} payload.key - name of the list
  470. * @param {*} payload.value - the value we want to remove
  471. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  472. * @returns {Promise} - returns a promise (resolve, reject)
  473. */
  474. LREM(payload) {
  475. return new Promise((resolve, reject) => {
  476. let { key, value } = payload;
  477. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  478. // automatically stringify objects and arrays into JSON
  479. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  480. CacheModule.client
  481. .LREM(key, 1, value)
  482. .then(() => resolve())
  483. .catch(err => reject(new Error(err)));
  484. });
  485. }
  486. /**
  487. * Gets a list of keys in Redis with a matching pattern
  488. *
  489. * @param {object} payload - object containing payload
  490. * @param {string} payload.pattern - pattern to search for
  491. * @returns {Promise} - returns a promise (resolve, reject)
  492. */
  493. KEYS(payload) {
  494. return new Promise((resolve, reject) => {
  495. const { pattern } = payload;
  496. CacheModule.client
  497. .KEYS(pattern)
  498. .then(keys => resolve(keys))
  499. .catch(err => reject(new Error(err)));
  500. });
  501. }
  502. /**
  503. * Returns a redis schema
  504. *
  505. * @param {object} payload - object containing the payload
  506. * @param {string} payload.schemaName - the name of the schema to get
  507. * @returns {Promise} - returns promise (reject, resolve)
  508. */
  509. GET_SCHEMA(payload) {
  510. return new Promise(resolve => {
  511. resolve(CacheModule.schemas[payload.schemaName]);
  512. });
  513. }
  514. }
  515. export default new _CacheModule();