index.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. import async from "async";
  2. import config from "config";
  3. import redis from "redis";
  4. import mongoose from "mongoose";
  5. import CoreClass from "../../core";
  6. // Lightweight / convenience wrapper around redis module for our needs
  7. const pubs = {};
  8. const subs = {};
  9. let CacheModule;
  10. class _CacheModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("cache");
  14. CacheModule = this;
  15. }
  16. /**
  17. * Initialises the cache/redis module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. const importSchema = schemaName =>
  23. new Promise(resolve => {
  24. import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
  25. });
  26. this.schemas = {
  27. session: await importSchema("session"),
  28. station: await importSchema("station"),
  29. playlist: await importSchema("playlist"),
  30. officialPlaylist: await importSchema("officialPlaylist"),
  31. song: await importSchema("song"),
  32. punishment: await importSchema("punishment"),
  33. recentActivity: await importSchema("recentActivity"),
  34. ratings: await importSchema("ratings")
  35. };
  36. return new Promise((resolve, reject) => {
  37. this.url = config.get("redis").url;
  38. this.password = config.get("redis").password;
  39. this.log("INFO", "Connecting...");
  40. this.client = redis.createClient({
  41. url: this.url,
  42. password: this.password,
  43. reconnectStrategy: retries => {
  44. if (this.getStatus() !== "LOCKDOWN") {
  45. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  46. this.log("INFO", `Attempting to reconnect.`);
  47. if (retries >= 10) {
  48. this.log("ERROR", `Stopped trying to reconnect.`);
  49. this.setStatus("FAILED");
  50. new Error("Stopped trying to reconnect.");
  51. } else {
  52. Math.min(retries * 50, 500);
  53. }
  54. }
  55. }
  56. });
  57. this.client.on("error", err => {
  58. if (this.getStatus() === "INITIALIZING") reject(err);
  59. if (this.getStatus() === "LOCKDOWN") return;
  60. this.log("ERROR", `Error ${err.message}.`);
  61. });
  62. this.client.on("ready", () => {
  63. this.log("INFO", "Redis is ready.");
  64. if (this.getStatus() === "INITIALIZING") resolve();
  65. else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
  66. });
  67. this.client.connect().then(async () => {
  68. this.log("INFO", "Connected succesfully.");
  69. });
  70. // TODO move to a better place
  71. CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
  72. async.eachLimit(keys, 1, (key, next) => {
  73. CacheModule.runJob("DEL", { key }).finally(() => {
  74. next();
  75. });
  76. });
  77. });
  78. });
  79. }
  80. /**
  81. * Quits redis client
  82. *
  83. * @returns {Promise} - returns promise (reject, resolve)
  84. */
  85. QUIT() {
  86. return new Promise(resolve => {
  87. if (CacheModule.client.connected) {
  88. CacheModule.client.quit();
  89. Object.keys(pubs).forEach(channel => pubs[channel].quit());
  90. Object.keys(subs).forEach(channel => subs[channel].client.quit());
  91. }
  92. resolve();
  93. });
  94. }
  95. /**
  96. * Sets a single value in a table
  97. *
  98. * @param {object} payload - object containing payload
  99. * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
  100. * @param {string} payload.key - name of the key to set
  101. * @param {*} payload.value - the value we want to set
  102. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  103. * @returns {Promise} - returns a promise (resolve, reject)
  104. */
  105. HSET(payload) {
  106. return new Promise((resolve, reject) => {
  107. let { key } = payload;
  108. let { value } = payload;
  109. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  110. // automatically stringify objects and arrays into JSON
  111. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  112. CacheModule.client
  113. .HSET(payload.table, key, value)
  114. .then(() => resolve(JSON.parse(value)))
  115. .catch(err => reject(new Error(err)));
  116. });
  117. }
  118. /**
  119. * Gets a single value from a table
  120. *
  121. * @param {object} payload - object containing payload
  122. * @param {string} payload.table - name of the table to get the value from (table === redis hash)
  123. * @param {string} payload.key - name of the key to fetch
  124. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  125. * @returns {Promise} - returns a promise (resolve, reject)
  126. */
  127. HGET(payload) {
  128. return new Promise((resolve, reject) => {
  129. let { key } = payload;
  130. if (!key) {
  131. reject(new Error("Invalid key!"));
  132. return;
  133. }
  134. if (!payload.table) {
  135. reject(new Error("Invalid table!"));
  136. return;
  137. }
  138. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  139. CacheModule.client
  140. .HGET(payload.table, key, payload.value)
  141. .then(value => {
  142. let parsedValue;
  143. try {
  144. parsedValue = JSON.parse(value);
  145. } catch (err) {
  146. return reject(err);
  147. }
  148. return resolve(parsedValue);
  149. })
  150. .catch(err => reject(new Error(err)));
  151. });
  152. }
  153. /**
  154. * Deletes a single value from a table
  155. *
  156. * @param {object} payload - object containing payload
  157. * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
  158. * @param {string} payload.key - name of the key to delete
  159. * @returns {Promise} - returns a promise (resolve, reject)
  160. */
  161. HDEL(payload) {
  162. return new Promise((resolve, reject) => {
  163. let { key } = payload;
  164. if (!payload.table) {
  165. reject(new Error("Invalid table!"));
  166. return;
  167. }
  168. if (!key) {
  169. reject(new Error("Invalid key!"));
  170. return;
  171. }
  172. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  173. CacheModule.client
  174. .HDEL(payload.table, key)
  175. .then(() => resolve())
  176. .catch(err => reject(new Error(err)));
  177. });
  178. }
  179. /**
  180. * Returns all the keys for a table
  181. *
  182. * @param {object} payload - object containing payload
  183. * @param {string} payload.table - name of the table to get the values from (table === redis hash)
  184. * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
  185. * @returns {Promise} - returns a promise (resolve, reject)
  186. */
  187. HGETALL(payload) {
  188. return new Promise((resolve, reject) => {
  189. if (!payload.table) {
  190. reject(new Error("Invalid table!"));
  191. return;
  192. }
  193. CacheModule.client
  194. .HGETALL(payload.table)
  195. .then(obj => {
  196. if (obj)
  197. Object.keys(obj).forEach(key => {
  198. obj[key] = JSON.parse(obj[key]);
  199. });
  200. else if (!obj) obj = [];
  201. resolve(obj);
  202. })
  203. .catch(err => reject(new Error(err)));
  204. });
  205. }
  206. /**
  207. * Deletes a single value
  208. *
  209. * @param {object} payload - object containing payload
  210. * @param {string} payload.key - name of the key to delete
  211. * @returns {Promise} - returns a promise (resolve, reject)
  212. */
  213. DEL(payload) {
  214. return new Promise((resolve, reject) => {
  215. let { key } = payload;
  216. if (!key) {
  217. reject(new Error("Invalid key!"));
  218. return;
  219. }
  220. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  221. CacheModule.client
  222. .DEL(key)
  223. .then(() => resolve())
  224. .catch(err => reject(new Error(err)));
  225. });
  226. }
  227. /**
  228. * Publish a message to a channel, caches the redis client connection
  229. *
  230. * @param {object} payload - object containing payload
  231. * @param {string} payload.channel - the name of the channel we want to publish a message to
  232. * @param {*} payload.value - the value we want to send
  233. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  234. * @returns {Promise} - returns a promise (resolve, reject)
  235. */
  236. PUB(payload) {
  237. return new Promise((resolve, reject) => {
  238. let { value } = payload;
  239. if (!payload.channel) {
  240. reject(new Error("Invalid channel!"));
  241. return;
  242. }
  243. if (!value) {
  244. reject(new Error("Invalid value!"));
  245. return;
  246. }
  247. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  248. CacheModule.client
  249. .publish(payload.channel, value)
  250. .then(() => resolve())
  251. .catch(err => reject(new Error(err)));
  252. });
  253. }
  254. /**
  255. * Subscribe to a channel, caches the redis client connection
  256. *
  257. * @param {object} payload - object containing payload
  258. * @param {string} payload.channel - name of the channel to subscribe to
  259. * @param {boolean} [payload.parseJson=true] - parse the message as JSON
  260. * @returns {Promise} - returns a promise (resolve, reject)
  261. */
  262. SUB(payload) {
  263. return new Promise((resolve, reject) => {
  264. if (!payload.channel) {
  265. reject(new Error("Invalid channel!"));
  266. return;
  267. }
  268. if (subs[payload.channel] === undefined) {
  269. subs[payload.channel] = {
  270. client: redis.createClient({
  271. url: CacheModule.url,
  272. password: CacheModule.password
  273. }),
  274. cbs: []
  275. };
  276. subs[payload.channel].client.connect().then(() => {
  277. subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {
  278. if (message.startsWith("[") || message.startsWith("{"))
  279. try {
  280. message = JSON.parse(message);
  281. } catch (err) {
  282. console.error(err);
  283. }
  284. else if (message.startsWith('"') && message.endsWith('"'))
  285. message = message.substring(1).substring(0, message.length - 2);
  286. subs[channel].cbs.forEach(cb => cb(message));
  287. });
  288. });
  289. }
  290. subs[payload.channel].cbs.push(payload.cb);
  291. resolve();
  292. });
  293. }
  294. /**
  295. * Gets a full list from Redis
  296. *
  297. * @param {object} payload - object containing payload
  298. * @param {string} payload.key - name of the table to get the value from (table === redis hash)
  299. * @returns {Promise} - returns a promise (resolve, reject)
  300. */
  301. LRANGE(payload) {
  302. return new Promise((resolve, reject) => {
  303. let { key } = payload;
  304. if (!key) {
  305. reject(new Error("Invalid key!"));
  306. return;
  307. }
  308. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  309. CacheModule.client
  310. .LRANGE(key, 0, -1)
  311. .then(list => resolve(list))
  312. .catch(err => reject(new Error(err)));
  313. });
  314. }
  315. /**
  316. * Adds a value to a list in Redis
  317. *
  318. * @param {object} payload - object containing payload
  319. * @param {string} payload.key - name of the list
  320. * @param {*} payload.value - the value we want to set
  321. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  322. * @returns {Promise} - returns a promise (resolve, reject)
  323. */
  324. RPUSH(payload) {
  325. return new Promise((resolve, reject) => {
  326. let { key, value } = payload;
  327. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  328. // automatically stringify objects and arrays into JSON
  329. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  330. CacheModule.client
  331. .RPUSH(key, value)
  332. .then(() => resolve())
  333. .catch(err => reject(new Error(err)));
  334. });
  335. }
  336. /**
  337. * Adds a value to a list in Redis using LPUSH
  338. *
  339. * @param {object} payload - object containing payload
  340. * @param {string} payload.key - name of the list
  341. * @param {*} payload.value - the value we want to set
  342. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  343. * @returns {Promise} - returns a promise (resolve, reject)
  344. */
  345. LPUSH(payload) {
  346. return new Promise((resolve, reject) => {
  347. let { key, value } = payload;
  348. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  349. // automatically stringify objects and arrays into JSON
  350. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  351. CacheModule.client
  352. .LPUSH(key, value)
  353. .then(() => resolve())
  354. .catch(err => reject(new Error(err)));
  355. });
  356. }
  357. /**
  358. * Gets the length of a Redis list
  359. *
  360. * @param {object} payload - object containing payload
  361. * @param {string} payload.key - name of the list
  362. * @returns {Promise} - returns a promise (resolve, reject)
  363. */
  364. LLEN(payload) {
  365. return new Promise((resolve, reject) => {
  366. const { key } = payload;
  367. CacheModule.client
  368. .LLEN(key)
  369. .then(len => resolve(len))
  370. .catch(err => reject(new Error(err)));
  371. });
  372. }
  373. /**
  374. * Removes an item from a list using RPOP
  375. *
  376. * @param {object} payload - object containing payload
  377. * @param {string} payload.key - name of the list
  378. * @returns {Promise} - returns a promise (resolve, reject)
  379. */
  380. RPOP(payload) {
  381. return new Promise((resolve, reject) => {
  382. const { key } = payload;
  383. CacheModule.client
  384. .RPOP(key)
  385. .then(() => resolve())
  386. .catch(err => reject(new Error(err)));
  387. });
  388. }
  389. /**
  390. * Removes a value from a list in Redis
  391. *
  392. * @param {object} payload - object containing payload
  393. * @param {string} payload.key - name of the list
  394. * @param {*} payload.value - the value we want to remove
  395. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  396. * @returns {Promise} - returns a promise (resolve, reject)
  397. */
  398. LREM(payload) {
  399. return new Promise((resolve, reject) => {
  400. let { key, value } = payload;
  401. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  402. // automatically stringify objects and arrays into JSON
  403. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  404. CacheModule.client
  405. .LREM(key, 1, value)
  406. .then(() => resolve())
  407. .catch(err => reject(new Error(err)));
  408. });
  409. }
  410. /**
  411. * Gets a list of keys in Redis with a matching pattern
  412. *
  413. * @param {object} payload - object containing payload
  414. * @param {string} payload.pattern - pattern to search for
  415. * @returns {Promise} - returns a promise (resolve, reject)
  416. */
  417. KEYS(payload) {
  418. return new Promise((resolve, reject) => {
  419. const { pattern } = payload;
  420. CacheModule.client
  421. .KEYS(pattern)
  422. .then(keys => resolve(keys))
  423. .catch(err => reject(new Error(err)));
  424. });
  425. }
  426. /**
  427. * Returns a redis schema
  428. *
  429. * @param {object} payload - object containing the payload
  430. * @param {string} payload.schemaName - the name of the schema to get
  431. * @returns {Promise} - returns promise (reject, resolve)
  432. */
  433. GET_SCHEMA(payload) {
  434. return new Promise(resolve => {
  435. resolve(CacheModule.schemas[payload.schemaName]);
  436. });
  437. }
  438. }
  439. export default new _CacheModule();