index.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. import async from "async";
  2. import config from "config";
  3. import redis from "redis";
  4. import mongoose from "mongoose";
  5. import CoreClass from "../../core";
  6. // Lightweight / convenience wrapper around redis module for our needs
  7. const pubs = {};
  8. const subs = {};
  9. let CacheModule;
  10. class _CacheModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("cache");
  14. CacheModule = this;
  15. }
  16. /**
  17. * Initialises the cache/redis module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. const importSchema = schemaName =>
  23. new Promise(resolve => {
  24. import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
  25. });
  26. this.schemas = {
  27. session: await importSchema("session"),
  28. station: await importSchema("station"),
  29. playlist: await importSchema("playlist"),
  30. officialPlaylist: await importSchema("officialPlaylist"),
  31. song: await importSchema("song"),
  32. punishment: await importSchema("punishment"),
  33. recentActivity: await importSchema("recentActivity"),
  34. ratings: await importSchema("ratings")
  35. };
  36. return new Promise((resolve, reject) => {
  37. this.url = config.get("redis").url;
  38. this.password = config.get("redis").password;
  39. this.log("INFO", "Connecting...");
  40. this.client = redis.createClient({
  41. url: this.url,
  42. password: this.password,
  43. reconnectStrategy: retries => {
  44. if (this.getStatus() !== "LOCKDOWN") {
  45. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  46. this.log("INFO", `Attempting to reconnect.`);
  47. if (retries >= 10) {
  48. this.log("ERROR", `Stopped trying to reconnect.`);
  49. this.setStatus("FAILED");
  50. new Error("Stopped trying to reconnect.");
  51. } else {
  52. Math.min(retries * 50, 500);
  53. }
  54. }
  55. }
  56. });
  57. this.client.on("error", err => {
  58. if (this.getStatus() === "INITIALIZING") reject(err);
  59. if (this.getStatus() === "LOCKDOWN") return;
  60. this.log("ERROR", `Error ${err.message}.`);
  61. });
  62. this.client.on("ready", () => {
  63. this.log("INFO", "Redis is ready.");
  64. if (this.getStatus() === "INITIALIZING") resolve();
  65. else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
  66. });
  67. this.client.connect().then(async () => {
  68. this.log("INFO", "Connected succesfully.");
  69. });
  70. // TODO move to a better place
  71. CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
  72. async.eachLimit(keys, 1, (key, next) => {
  73. CacheModule.runJob("DEL", { key }).finally(() => {
  74. next();
  75. });
  76. });
  77. });
  78. });
  79. }
  80. /**
  81. * Quits redis client
  82. *
  83. * @returns {Promise} - returns promise (reject, resolve)
  84. */
  85. QUIT() {
  86. return new Promise(resolve => {
  87. if (CacheModule.client.connected) {
  88. CacheModule.client.quit();
  89. Object.keys(pubs).forEach(channel => pubs[channel].quit());
  90. Object.keys(subs).forEach(channel => subs[channel].client.quit());
  91. }
  92. resolve();
  93. });
  94. }
  95. /**
  96. * Sets a single value in a table
  97. *
  98. * @param {object} payload - object containing payload
  99. * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
  100. * @param {string} payload.key - name of the key to set
  101. * @param {*} payload.value - the value we want to set
  102. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  103. * @returns {Promise} - returns a promise (resolve, reject)
  104. */
  105. HSET(payload) {
  106. return new Promise((resolve, reject) => {
  107. let { key } = payload;
  108. let { value } = payload;
  109. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  110. // automatically stringify objects and arrays into JSON
  111. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  112. CacheModule.client
  113. .HSET(payload.table, key, value)
  114. .then(() => {
  115. let parsed = value;
  116. try {
  117. parsed = JSON.parse(value);
  118. } catch {
  119. // Do nothing
  120. }
  121. resolve(parsed);
  122. })
  123. .catch(err => reject(new Error(err)));
  124. });
  125. }
  126. /**
  127. * Gets a single value from a table
  128. *
  129. * @param {object} payload - object containing payload
  130. * @param {string} payload.table - name of the table to get the value from (table === redis hash)
  131. * @param {string} payload.key - name of the key to fetch
  132. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  133. * @returns {Promise} - returns a promise (resolve, reject)
  134. */
  135. HGET(payload) {
  136. return new Promise((resolve, reject) => {
  137. let { key } = payload;
  138. if (!key) {
  139. reject(new Error("Invalid key!"));
  140. return;
  141. }
  142. if (!payload.table) {
  143. reject(new Error("Invalid table!"));
  144. return;
  145. }
  146. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  147. CacheModule.client
  148. .HGET(payload.table, key, payload.value)
  149. .then(value => {
  150. let parsedValue;
  151. try {
  152. parsedValue = JSON.parse(value);
  153. } catch (err) {
  154. return reject(err);
  155. }
  156. return resolve(parsedValue);
  157. })
  158. .catch(err => reject(new Error(err)));
  159. });
  160. }
  161. /**
  162. * Deletes a single value from a table
  163. *
  164. * @param {object} payload - object containing payload
  165. * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
  166. * @param {string} payload.key - name of the key to delete
  167. * @returns {Promise} - returns a promise (resolve, reject)
  168. */
  169. HDEL(payload) {
  170. return new Promise((resolve, reject) => {
  171. let { key } = payload;
  172. if (!payload.table) {
  173. reject(new Error("Invalid table!"));
  174. return;
  175. }
  176. if (!key) {
  177. reject(new Error("Invalid key!"));
  178. return;
  179. }
  180. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  181. CacheModule.client
  182. .HDEL(payload.table, key)
  183. .then(() => resolve())
  184. .catch(err => reject(new Error(err)));
  185. });
  186. }
  187. /**
  188. * Returns all the keys for a table
  189. *
  190. * @param {object} payload - object containing payload
  191. * @param {string} payload.table - name of the table to get the values from (table === redis hash)
  192. * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
  193. * @returns {Promise} - returns a promise (resolve, reject)
  194. */
  195. HGETALL(payload) {
  196. return new Promise((resolve, reject) => {
  197. if (!payload.table) {
  198. reject(new Error("Invalid table!"));
  199. return;
  200. }
  201. CacheModule.client
  202. .HGETALL(payload.table)
  203. .then(obj => {
  204. if (obj)
  205. Object.keys(obj).forEach(key => {
  206. obj[key] = JSON.parse(obj[key]);
  207. });
  208. else if (!obj) obj = [];
  209. resolve(obj);
  210. })
  211. .catch(err => reject(new Error(err)));
  212. });
  213. }
  214. /**
  215. * Deletes a single value
  216. *
  217. * @param {object} payload - object containing payload
  218. * @param {string} payload.key - name of the key to delete
  219. * @returns {Promise} - returns a promise (resolve, reject)
  220. */
  221. DEL(payload) {
  222. return new Promise((resolve, reject) => {
  223. let { key } = payload;
  224. if (!key) {
  225. reject(new Error("Invalid key!"));
  226. return;
  227. }
  228. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  229. CacheModule.client
  230. .DEL(key)
  231. .then(() => resolve())
  232. .catch(err => reject(new Error(err)));
  233. });
  234. }
  235. /**
  236. * Publish a message to a channel, caches the redis client connection
  237. *
  238. * @param {object} payload - object containing payload
  239. * @param {string} payload.channel - the name of the channel we want to publish a message to
  240. * @param {*} payload.value - the value we want to send
  241. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  242. * @returns {Promise} - returns a promise (resolve, reject)
  243. */
  244. PUB(payload) {
  245. return new Promise((resolve, reject) => {
  246. let { value } = payload;
  247. if (!payload.channel) {
  248. reject(new Error("Invalid channel!"));
  249. return;
  250. }
  251. if (!value) {
  252. reject(new Error("Invalid value!"));
  253. return;
  254. }
  255. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  256. CacheModule.client
  257. .publish(payload.channel, value)
  258. .then(() => resolve())
  259. .catch(err => reject(new Error(err)));
  260. });
  261. }
  262. /**
  263. * Subscribe to a channel, caches the redis client connection
  264. *
  265. * @param {object} payload - object containing payload
  266. * @param {string} payload.channel - name of the channel to subscribe to
  267. * @param {boolean} [payload.parseJson=true] - parse the message as JSON
  268. * @returns {Promise} - returns a promise (resolve, reject)
  269. */
  270. SUB(payload) {
  271. return new Promise((resolve, reject) => {
  272. if (!payload.channel) {
  273. reject(new Error("Invalid channel!"));
  274. return;
  275. }
  276. if (subs[payload.channel] === undefined) {
  277. subs[payload.channel] = {
  278. client: redis.createClient({
  279. url: CacheModule.url,
  280. password: CacheModule.password
  281. }),
  282. cbs: []
  283. };
  284. subs[payload.channel].client.connect().then(() => {
  285. subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {
  286. if (message.startsWith("[") || message.startsWith("{"))
  287. try {
  288. message = JSON.parse(message);
  289. } catch (err) {
  290. console.error(err);
  291. }
  292. else if (message.startsWith('"') && message.endsWith('"'))
  293. message = message.substring(1).substring(0, message.length - 2);
  294. subs[channel].cbs.forEach(cb => cb(message));
  295. });
  296. });
  297. }
  298. subs[payload.channel].cbs.push(payload.cb);
  299. resolve();
  300. });
  301. }
  302. /**
  303. * Gets a full list from Redis
  304. *
  305. * @param {object} payload - object containing payload
  306. * @param {string} payload.key - name of the table to get the value from (table === redis hash)
  307. * @returns {Promise} - returns a promise (resolve, reject)
  308. */
  309. LRANGE(payload) {
  310. return new Promise((resolve, reject) => {
  311. let { key } = payload;
  312. if (!key) {
  313. reject(new Error("Invalid key!"));
  314. return;
  315. }
  316. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  317. CacheModule.client
  318. .LRANGE(key, 0, -1)
  319. .then(list => resolve(list))
  320. .catch(err => reject(new Error(err)));
  321. });
  322. }
  323. /**
  324. * Adds a value to a list in Redis
  325. *
  326. * @param {object} payload - object containing payload
  327. * @param {string} payload.key - name of the list
  328. * @param {*} payload.value - the value we want to set
  329. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  330. * @returns {Promise} - returns a promise (resolve, reject)
  331. */
  332. RPUSH(payload) {
  333. return new Promise((resolve, reject) => {
  334. let { key, value } = payload;
  335. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  336. // automatically stringify objects and arrays into JSON
  337. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  338. CacheModule.client
  339. .RPUSH(key, value)
  340. .then(() => resolve())
  341. .catch(err => reject(new Error(err)));
  342. });
  343. }
  344. /**
  345. * Adds a value to a list in Redis using LPUSH
  346. *
  347. * @param {object} payload - object containing payload
  348. * @param {string} payload.key - name of the list
  349. * @param {*} payload.value - the value we want to set
  350. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  351. * @returns {Promise} - returns a promise (resolve, reject)
  352. */
  353. LPUSH(payload) {
  354. return new Promise((resolve, reject) => {
  355. let { key, value } = payload;
  356. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  357. // automatically stringify objects and arrays into JSON
  358. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  359. CacheModule.client
  360. .LPUSH(key, value)
  361. .then(() => resolve())
  362. .catch(err => reject(new Error(err)));
  363. });
  364. }
  365. /**
  366. * Gets the length of a Redis list
  367. *
  368. * @param {object} payload - object containing payload
  369. * @param {string} payload.key - name of the list
  370. * @returns {Promise} - returns a promise (resolve, reject)
  371. */
  372. LLEN(payload) {
  373. return new Promise((resolve, reject) => {
  374. const { key } = payload;
  375. CacheModule.client
  376. .LLEN(key)
  377. .then(len => resolve(len))
  378. .catch(err => reject(new Error(err)));
  379. });
  380. }
  381. /**
  382. * Removes an item from a list using RPOP
  383. *
  384. * @param {object} payload - object containing payload
  385. * @param {string} payload.key - name of the list
  386. * @returns {Promise} - returns a promise (resolve, reject)
  387. */
  388. RPOP(payload) {
  389. return new Promise((resolve, reject) => {
  390. const { key } = payload;
  391. CacheModule.client
  392. .RPOP(key)
  393. .then(() => resolve())
  394. .catch(err => reject(new Error(err)));
  395. });
  396. }
  397. /**
  398. * Removes a value from a list in Redis
  399. *
  400. * @param {object} payload - object containing payload
  401. * @param {string} payload.key - name of the list
  402. * @param {*} payload.value - the value we want to remove
  403. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  404. * @returns {Promise} - returns a promise (resolve, reject)
  405. */
  406. LREM(payload) {
  407. return new Promise((resolve, reject) => {
  408. let { key, value } = payload;
  409. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  410. // automatically stringify objects and arrays into JSON
  411. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  412. CacheModule.client
  413. .LREM(key, 1, value)
  414. .then(() => resolve())
  415. .catch(err => reject(new Error(err)));
  416. });
  417. }
  418. /**
  419. * Gets a list of keys in Redis with a matching pattern
  420. *
  421. * @param {object} payload - object containing payload
  422. * @param {string} payload.pattern - pattern to search for
  423. * @returns {Promise} - returns a promise (resolve, reject)
  424. */
  425. KEYS(payload) {
  426. return new Promise((resolve, reject) => {
  427. const { pattern } = payload;
  428. CacheModule.client
  429. .KEYS(pattern)
  430. .then(keys => resolve(keys))
  431. .catch(err => reject(new Error(err)));
  432. });
  433. }
  434. /**
  435. * Returns a redis schema
  436. *
  437. * @param {object} payload - object containing the payload
  438. * @param {string} payload.schemaName - the name of the schema to get
  439. * @returns {Promise} - returns promise (reject, resolve)
  440. */
  441. GET_SCHEMA(payload) {
  442. return new Promise(resolve => {
  443. resolve(CacheModule.schemas[payload.schemaName]);
  444. });
  445. }
  446. }
  447. export default new _CacheModule();