index.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. import async from "async";
  2. import config from "config";
  3. import redis from "redis";
  4. import mongoose from "mongoose";
  5. import CoreClass from "../../core";
  6. // Lightweight / convenience wrapper around redis module for our needs
  7. const pubs = {};
  8. const subs = {};
  9. let CacheModule;
  10. class _CacheModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("cache");
  14. CacheModule = this;
  15. }
  16. /**
  17. * Initialises the cache/redis module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. const importSchema = schemaName =>
  23. new Promise(resolve => {
  24. import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
  25. });
  26. this.schemas = {
  27. session: await importSchema("session"),
  28. station: await importSchema("station"),
  29. playlist: await importSchema("playlist"),
  30. officialPlaylist: await importSchema("officialPlaylist"),
  31. song: await importSchema("song"),
  32. punishment: await importSchema("punishment"),
  33. recentActivity: await importSchema("recentActivity"),
  34. ratings: await importSchema("ratings")
  35. };
  36. return new Promise((resolve, reject) => {
  37. this.url = config.get("redis").url;
  38. this.password = config.get("redis").password;
  39. this.log("INFO", "Connecting...");
  40. this.client = redis.createClient({
  41. url: this.url,
  42. password: this.password,
  43. reconnectStrategy: retries => {
  44. if (this.getStatus() !== "LOCKDOWN") {
  45. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  46. this.log("INFO", `Attempting to reconnect.`);
  47. if (retries >= 10) {
  48. this.log("ERROR", `Stopped trying to reconnect.`);
  49. this.setStatus("FAILED");
  50. new Error("Stopped trying to reconnect.");
  51. } else {
  52. Math.min(retries * 50, 500);
  53. }
  54. }
  55. }
  56. });
  57. this.client.on("error", err => {
  58. if (this.getStatus() === "INITIALIZING") reject(err);
  59. if (this.getStatus() === "LOCKDOWN") return;
  60. this.log("ERROR", `Error ${err.message}.`);
  61. });
  62. this.client.on("ready", () => {
  63. this.log("INFO", "Redis is ready.");
  64. if (this.getStatus() === "INITIALIZING") resolve();
  65. else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
  66. });
  67. this.client.connect().then(async () => {
  68. this.log("INFO", "Connected succesfully.");
  69. });
  70. // TODO move to a better place
  71. CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
  72. async.eachLimit(keys, 1, (key, next) => {
  73. CacheModule.runJob("DEL", { key }).finally(() => {
  74. next();
  75. });
  76. });
  77. });
  78. });
  79. }
  80. /**
  81. * Quits redis client
  82. *
  83. * @returns {Promise} - returns promise (reject, resolve)
  84. */
  85. QUIT() {
  86. return new Promise(resolve => {
  87. if (CacheModule.client.connected) {
  88. CacheModule.client.quit();
  89. Object.keys(pubs).forEach(channel => pubs[channel].quit());
  90. Object.keys(subs).forEach(channel => subs[channel].client.quit());
  91. }
  92. resolve();
  93. });
  94. }
  95. /**
  96. * Sets a single value
  97. *
  98. * @param {object} payload - object containing payload
  99. * @param {string} payload.key - name of the key to set
  100. * @param {*} payload.value - the value we want to set
  101. * @param {number} payload.ttl - ttl of the key in seconds
  102. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  103. * @returns {Promise} - returns a promise (resolve, reject)
  104. */
  105. SET(payload) {
  106. return new Promise((resolve, reject) => {
  107. let { key, value } = payload;
  108. const { ttl } = payload;
  109. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  110. // automatically stringify objects and arrays into JSON
  111. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  112. let options = null;
  113. if (ttl) {
  114. options = {
  115. EX: ttl
  116. };
  117. }
  118. CacheModule.client
  119. .SET(key, value, options)
  120. .then(() => {
  121. let parsed = value;
  122. try {
  123. parsed = JSON.parse(value);
  124. } catch {
  125. // Do nothing
  126. }
  127. resolve(parsed);
  128. })
  129. .catch(err => reject(new Error(err)));
  130. });
  131. }
  132. /**
  133. * Sets a single value in a table
  134. *
  135. * @param {object} payload - object containing payload
  136. * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
  137. * @param {string} payload.key - name of the key to set
  138. * @param {*} payload.value - the value we want to set
  139. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  140. * @returns {Promise} - returns a promise (resolve, reject)
  141. */
  142. HSET(payload) {
  143. return new Promise((resolve, reject) => {
  144. let { key } = payload;
  145. let { value } = payload;
  146. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  147. // automatically stringify objects and arrays into JSON
  148. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  149. CacheModule.client
  150. .HSET(payload.table, key, value)
  151. .then(() => resolve(JSON.parse(value)))
  152. .catch(err => reject(new Error(err)));
  153. });
  154. }
  155. /**
  156. * Gets a single value
  157. *
  158. * @param {object} payload - object containing payload
  159. * @param {string} payload.key - name of the key to fetch
  160. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  161. * @returns {Promise} - returns a promise (resolve, reject)
  162. */
  163. GET(payload) {
  164. return new Promise((resolve, reject) => {
  165. let { key } = payload;
  166. if (!key) {
  167. reject(new Error("Invalid key!"));
  168. return;
  169. }
  170. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  171. CacheModule.client
  172. .GET(key, payload.value)
  173. .then(value => {
  174. if (value && !value.startsWith("{") && !value.startsWith("[")) return resolve(value);
  175. let parsedValue;
  176. try {
  177. parsedValue = JSON.parse(value);
  178. } catch (err) {
  179. return reject(err);
  180. }
  181. return resolve(parsedValue);
  182. })
  183. .catch(err => reject(new Error(err)));
  184. });
  185. }
  186. /**
  187. * Gets a single value from a table
  188. *
  189. * @param {object} payload - object containing payload
  190. * @param {string} payload.table - name of the table to get the value from (table === redis hash)
  191. * @param {string} payload.key - name of the key to fetch
  192. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  193. * @returns {Promise} - returns a promise (resolve, reject)
  194. */
  195. HGET(payload) {
  196. return new Promise((resolve, reject) => {
  197. let { key } = payload;
  198. if (!key) {
  199. reject(new Error("Invalid key!"));
  200. return;
  201. }
  202. if (!payload.table) {
  203. reject(new Error("Invalid table!"));
  204. return;
  205. }
  206. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  207. CacheModule.client
  208. .HGET(payload.table, key, payload.value)
  209. .then(value => {
  210. let parsedValue;
  211. try {
  212. parsedValue = JSON.parse(value);
  213. } catch (err) {
  214. return reject(err);
  215. }
  216. return resolve(parsedValue);
  217. })
  218. .catch(err => reject(new Error(err)));
  219. });
  220. }
  221. /**
  222. * Deletes a single value from a table
  223. *
  224. * @param {object} payload - object containing payload
  225. * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
  226. * @param {string} payload.key - name of the key to delete
  227. * @returns {Promise} - returns a promise (resolve, reject)
  228. */
  229. HDEL(payload) {
  230. return new Promise((resolve, reject) => {
  231. let { key } = payload;
  232. if (!payload.table) {
  233. reject(new Error("Invalid table!"));
  234. return;
  235. }
  236. if (!key) {
  237. reject(new Error("Invalid key!"));
  238. return;
  239. }
  240. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  241. CacheModule.client
  242. .HDEL(payload.table, key)
  243. .then(() => resolve())
  244. .catch(err => reject(new Error(err)));
  245. });
  246. }
  247. /**
  248. * Returns all the keys for a table
  249. *
  250. * @param {object} payload - object containing payload
  251. * @param {string} payload.table - name of the table to get the values from (table === redis hash)
  252. * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
  253. * @returns {Promise} - returns a promise (resolve, reject)
  254. */
  255. HGETALL(payload) {
  256. return new Promise((resolve, reject) => {
  257. if (!payload.table) {
  258. reject(new Error("Invalid table!"));
  259. return;
  260. }
  261. CacheModule.client
  262. .HGETALL(payload.table)
  263. .then(obj => {
  264. if (obj)
  265. Object.keys(obj).forEach(key => {
  266. obj[key] = JSON.parse(obj[key]);
  267. });
  268. else if (!obj) obj = [];
  269. resolve(obj);
  270. })
  271. .catch(err => reject(new Error(err)));
  272. });
  273. }
  274. /**
  275. * Deletes a single value
  276. *
  277. * @param {object} payload - object containing payload
  278. * @param {string} payload.key - name of the key to delete
  279. * @returns {Promise} - returns a promise (resolve, reject)
  280. */
  281. DEL(payload) {
  282. return new Promise((resolve, reject) => {
  283. let { key } = payload;
  284. if (!key) {
  285. reject(new Error("Invalid key!"));
  286. return;
  287. }
  288. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  289. CacheModule.client
  290. .DEL(key)
  291. .then(() => resolve())
  292. .catch(err => reject(new Error(err)));
  293. });
  294. }
  295. /**
  296. * Publish a message to a channel, caches the redis client connection
  297. *
  298. * @param {object} payload - object containing payload
  299. * @param {string} payload.channel - the name of the channel we want to publish a message to
  300. * @param {*} payload.value - the value we want to send
  301. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  302. * @returns {Promise} - returns a promise (resolve, reject)
  303. */
  304. PUB(payload) {
  305. return new Promise((resolve, reject) => {
  306. let { value } = payload;
  307. if (!payload.channel) {
  308. reject(new Error("Invalid channel!"));
  309. return;
  310. }
  311. if (!value) {
  312. reject(new Error("Invalid value!"));
  313. return;
  314. }
  315. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  316. CacheModule.client
  317. .publish(payload.channel, value)
  318. .then(() => resolve())
  319. .catch(err => reject(new Error(err)));
  320. });
  321. }
  322. /**
  323. * Subscribe to a channel, caches the redis client connection
  324. *
  325. * @param {object} payload - object containing payload
  326. * @param {string} payload.channel - name of the channel to subscribe to
  327. * @param {boolean} [payload.parseJson=true] - parse the message as JSON
  328. * @returns {Promise} - returns a promise (resolve, reject)
  329. */
  330. SUB(payload) {
  331. return new Promise((resolve, reject) => {
  332. if (!payload.channel) {
  333. reject(new Error("Invalid channel!"));
  334. return;
  335. }
  336. if (subs[payload.channel] === undefined) {
  337. subs[payload.channel] = {
  338. client: redis.createClient({
  339. url: CacheModule.url,
  340. password: CacheModule.password
  341. }),
  342. cbs: []
  343. };
  344. subs[payload.channel].client.connect().then(() => {
  345. subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {
  346. if (message.startsWith("[") || message.startsWith("{"))
  347. try {
  348. message = JSON.parse(message);
  349. } catch (err) {
  350. console.error(err);
  351. }
  352. else if (message.startsWith('"') && message.endsWith('"'))
  353. message = message.substring(1).substring(0, message.length - 2);
  354. subs[channel].cbs.forEach(cb => cb(message));
  355. });
  356. });
  357. }
  358. subs[payload.channel].cbs.push(payload.cb);
  359. resolve();
  360. });
  361. }
  362. /**
  363. * Gets a full list from Redis
  364. *
  365. * @param {object} payload - object containing payload
  366. * @param {string} payload.key - name of the table to get the value from (table === redis hash)
  367. * @returns {Promise} - returns a promise (resolve, reject)
  368. */
  369. LRANGE(payload) {
  370. return new Promise((resolve, reject) => {
  371. let { key } = payload;
  372. if (!key) {
  373. reject(new Error("Invalid key!"));
  374. return;
  375. }
  376. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  377. CacheModule.client
  378. .LRANGE(key, 0, -1)
  379. .then(list => resolve(list))
  380. .catch(err => reject(new Error(err)));
  381. });
  382. }
  383. /**
  384. * Adds a value to a list in Redis
  385. *
  386. * @param {object} payload - object containing payload
  387. * @param {string} payload.key - name of the list
  388. * @param {*} payload.value - the value we want to set
  389. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  390. * @returns {Promise} - returns a promise (resolve, reject)
  391. */
  392. RPUSH(payload) {
  393. return new Promise((resolve, reject) => {
  394. let { key, value } = payload;
  395. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  396. // automatically stringify objects and arrays into JSON
  397. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  398. CacheModule.client
  399. .RPUSH(key, value)
  400. .then(() => resolve())
  401. .catch(err => reject(new Error(err)));
  402. });
  403. }
  404. /**
  405. * Adds a value to a list in Redis using LPUSH
  406. *
  407. * @param {object} payload - object containing payload
  408. * @param {string} payload.key - name of the list
  409. * @param {*} payload.value - the value we want to set
  410. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  411. * @returns {Promise} - returns a promise (resolve, reject)
  412. */
  413. LPUSH(payload) {
  414. return new Promise((resolve, reject) => {
  415. let { key, value } = payload;
  416. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  417. // automatically stringify objects and arrays into JSON
  418. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  419. CacheModule.client
  420. .LPUSH(key, value)
  421. .then(() => resolve())
  422. .catch(err => reject(new Error(err)));
  423. });
  424. }
  425. /**
  426. * Gets the length of a Redis list
  427. *
  428. * @param {object} payload - object containing payload
  429. * @param {string} payload.key - name of the list
  430. * @returns {Promise} - returns a promise (resolve, reject)
  431. */
  432. LLEN(payload) {
  433. return new Promise((resolve, reject) => {
  434. const { key } = payload;
  435. CacheModule.client
  436. .LLEN(key)
  437. .then(len => resolve(len))
  438. .catch(err => reject(new Error(err)));
  439. });
  440. }
  441. /**
  442. * Removes an item from a list using RPOP
  443. *
  444. * @param {object} payload - object containing payload
  445. * @param {string} payload.key - name of the list
  446. * @returns {Promise} - returns a promise (resolve, reject)
  447. */
  448. RPOP(payload) {
  449. return new Promise((resolve, reject) => {
  450. const { key } = payload;
  451. CacheModule.client
  452. .RPOP(key)
  453. .then(() => resolve())
  454. .catch(err => reject(new Error(err)));
  455. });
  456. }
  457. /**
  458. * Removes a value from a list in Redis
  459. *
  460. * @param {object} payload - object containing payload
  461. * @param {string} payload.key - name of the list
  462. * @param {*} payload.value - the value we want to remove
  463. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  464. * @returns {Promise} - returns a promise (resolve, reject)
  465. */
  466. LREM(payload) {
  467. return new Promise((resolve, reject) => {
  468. let { key, value } = payload;
  469. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  470. // automatically stringify objects and arrays into JSON
  471. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  472. CacheModule.client
  473. .LREM(key, 1, value)
  474. .then(() => resolve())
  475. .catch(err => reject(new Error(err)));
  476. });
  477. }
  478. /**
  479. * Gets a list of keys in Redis with a matching pattern
  480. *
  481. * @param {object} payload - object containing payload
  482. * @param {string} payload.pattern - pattern to search for
  483. * @returns {Promise} - returns a promise (resolve, reject)
  484. */
  485. KEYS(payload) {
  486. return new Promise((resolve, reject) => {
  487. const { pattern } = payload;
  488. CacheModule.client
  489. .KEYS(pattern)
  490. .then(keys => resolve(keys))
  491. .catch(err => reject(new Error(err)));
  492. });
  493. }
  494. /**
  495. * Returns a redis schema
  496. *
  497. * @param {object} payload - object containing the payload
  498. * @param {string} payload.schemaName - the name of the schema to get
  499. * @returns {Promise} - returns promise (reject, resolve)
  500. */
  501. GET_SCHEMA(payload) {
  502. return new Promise(resolve => {
  503. resolve(CacheModule.schemas[payload.schemaName]);
  504. });
  505. }
  506. }
  507. export default new _CacheModule();