index.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. import async from "async";
  2. import config from "config";
  3. import redis from "redis";
  4. import mongoose from "mongoose";
  5. import CoreClass from "../../core";
  6. // Lightweight / convenience wrapper around redis module for our needs
  7. const pubs = {};
  8. const subs = {};
  9. let CacheModule;
  10. class _CacheModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("cache");
  14. CacheModule = this;
  15. }
  16. /**
  17. * Initialises the cache/redis module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. const importSchema = schemaName =>
  23. new Promise(resolve => {
  24. import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
  25. });
  26. this.schemas = {
  27. session: await importSchema("session"),
  28. station: await importSchema("station"),
  29. playlist: await importSchema("playlist"),
  30. officialPlaylist: await importSchema("officialPlaylist"),
  31. song: await importSchema("song"),
  32. punishment: await importSchema("punishment"),
  33. recentActivity: await importSchema("recentActivity"),
  34. ratings: await importSchema("ratings")
  35. };
  36. return new Promise((resolve, reject) => {
  37. this.url = config.get("redis").url;
  38. this.password = config.get("redis").password;
  39. this.log("INFO", "Connecting...");
  40. this.client = redis.createClient({
  41. url: this.url,
  42. password: this.password,
  43. retry_strategy: options => {
  44. if (this.getStatus() === "LOCKDOWN") return;
  45. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  46. this.log("INFO", `Attempting to reconnect.`);
  47. if (options.attempt >= 10) {
  48. this.log("ERROR", `Stopped trying to reconnect.`);
  49. this.setStatus("FAILED");
  50. }
  51. }
  52. });
  53. // TODO move to a better place
  54. CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
  55. async.eachLimit(keys, 1, (key, next) => {
  56. CacheModule.runJob("DEL", { key }).finally(() => {
  57. next();
  58. });
  59. });
  60. });
  61. this.client.on("error", err => {
  62. if (this.getStatus() === "INITIALIZING") reject(err);
  63. if (this.getStatus() === "LOCKDOWN") return;
  64. this.log("ERROR", `Error ${err.message}.`);
  65. });
  66. this.client.on("connect", () => {
  67. this.log("INFO", "Connected succesfully.");
  68. if (this.getStatus() === "INITIALIZING") resolve();
  69. else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
  70. });
  71. });
  72. }
  73. /**
  74. * Quits redis client
  75. *
  76. * @returns {Promise} - returns promise (reject, resolve)
  77. */
  78. QUIT() {
  79. return new Promise(resolve => {
  80. if (CacheModule.client.connected) {
  81. CacheModule.client.quit();
  82. Object.keys(pubs).forEach(channel => pubs[channel].quit());
  83. Object.keys(subs).forEach(channel => subs[channel].client.quit());
  84. }
  85. resolve();
  86. });
  87. }
  88. /**
  89. * Sets a single value in a table
  90. *
  91. * @param {object} payload - object containing payload
  92. * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
  93. * @param {string} payload.key - name of the key to set
  94. * @param {*} payload.value - the value we want to set
  95. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  96. * @returns {Promise} - returns a promise (resolve, reject)
  97. */
  98. HSET(payload) {
  99. return new Promise((resolve, reject) => {
  100. let { key } = payload;
  101. let { value } = payload;
  102. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  103. // automatically stringify objects and arrays into JSON
  104. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  105. CacheModule.client.hset(payload.table, key, value, err => {
  106. if (err) return reject(new Error(err));
  107. return resolve(JSON.parse(value));
  108. });
  109. });
  110. }
  111. /**
  112. * Gets a single value from a table
  113. *
  114. * @param {object} payload - object containing payload
  115. * @param {string} payload.table - name of the table to get the value from (table === redis hash)
  116. * @param {string} payload.key - name of the key to fetch
  117. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  118. * @returns {Promise} - returns a promise (resolve, reject)
  119. */
  120. HGET(payload) {
  121. return new Promise((resolve, reject) => {
  122. let { key } = payload;
  123. if (!key) {
  124. reject(new Error("Invalid key!"));
  125. return;
  126. }
  127. if (!payload.table) {
  128. reject(new Error("Invalid table!"));
  129. return;
  130. }
  131. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  132. CacheModule.client.hget(payload.table, key, (err, value) => {
  133. if (err) {
  134. reject(new Error(err));
  135. return;
  136. }
  137. try {
  138. value = JSON.parse(value);
  139. } catch (e) {
  140. reject(err);
  141. return;
  142. }
  143. resolve(value);
  144. });
  145. });
  146. }
  147. /**
  148. * Deletes a single value from a table
  149. *
  150. * @param {object} payload - object containing payload
  151. * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
  152. * @param {string} payload.key - name of the key to delete
  153. * @returns {Promise} - returns a promise (resolve, reject)
  154. */
  155. HDEL(payload) {
  156. return new Promise((resolve, reject) => {
  157. let { key } = payload;
  158. if (!payload.table) {
  159. reject(new Error("Invalid table!"));
  160. return;
  161. }
  162. if (!key) {
  163. reject(new Error("Invalid key!"));
  164. return;
  165. }
  166. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  167. CacheModule.client.hdel(payload.table, key, err => {
  168. if (err) {
  169. reject(new Error(err));
  170. return;
  171. }
  172. resolve();
  173. });
  174. });
  175. }
  176. /**
  177. * Returns all the keys for a table
  178. *
  179. * @param {object} payload - object containing payload
  180. * @param {string} payload.table - name of the table to get the values from (table === redis hash)
  181. * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
  182. * @returns {Promise} - returns a promise (resolve, reject)
  183. */
  184. HGETALL(payload) {
  185. return new Promise((resolve, reject) => {
  186. if (!payload.table) {
  187. reject(new Error("Invalid table!"));
  188. return;
  189. }
  190. CacheModule.client.hgetall(payload.table, (err, obj) => {
  191. if (err) {
  192. reject(new Error(err));
  193. return;
  194. }
  195. if (obj)
  196. Object.keys(obj).forEach(key => {
  197. obj[key] = JSON.parse(obj[key]);
  198. });
  199. else if (!obj) obj = [];
  200. resolve(obj);
  201. });
  202. });
  203. }
  204. /**
  205. * Deletes a single value
  206. *
  207. * @param {object} payload - object containing payload
  208. * @param {string} payload.key - name of the key to delete
  209. * @returns {Promise} - returns a promise (resolve, reject)
  210. */
  211. DEL(payload) {
  212. return new Promise((resolve, reject) => {
  213. let { key } = payload;
  214. if (!key) {
  215. reject(new Error("Invalid key!"));
  216. return;
  217. }
  218. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  219. CacheModule.client.del(key, err => {
  220. if (err) {
  221. reject(new Error(err));
  222. return;
  223. }
  224. resolve();
  225. });
  226. });
  227. }
  228. /**
  229. * Publish a message to a channel, caches the redis client connection
  230. *
  231. * @param {object} payload - object containing payload
  232. * @param {string} payload.channel - the name of the channel we want to publish a message to
  233. * @param {*} payload.value - the value we want to send
  234. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  235. * @returns {Promise} - returns a promise (resolve, reject)
  236. */
  237. PUB(payload) {
  238. return new Promise((resolve, reject) => {
  239. let { value } = payload;
  240. if (!payload.channel) {
  241. reject(new Error("Invalid channel!"));
  242. return;
  243. }
  244. if (!value) {
  245. reject(new Error("Invalid value!"));
  246. return;
  247. }
  248. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  249. CacheModule.client.publish(payload.channel, value, err => {
  250. if (err) reject(err);
  251. else resolve();
  252. });
  253. });
  254. }
  255. /**
  256. * Subscribe to a channel, caches the redis client connection
  257. *
  258. * @param {object} payload - object containing payload
  259. * @param {string} payload.channel - name of the channel to subscribe to
  260. * @param {boolean} [payload.parseJson=true] - parse the message as JSON
  261. * @returns {Promise} - returns a promise (resolve, reject)
  262. */
  263. SUB(payload) {
  264. return new Promise((resolve, reject) => {
  265. if (!payload.channel) {
  266. reject(new Error("Invalid channel!"));
  267. return;
  268. }
  269. if (subs[payload.channel] === undefined) {
  270. subs[payload.channel] = {
  271. client: redis.createClient({
  272. url: CacheModule.url,
  273. password: CacheModule.password
  274. }),
  275. cbs: []
  276. };
  277. subs[payload.channel].client.on("message", (channel, message) => {
  278. if (message.startsWith("[") || message.startsWith("{"))
  279. try {
  280. message = JSON.parse(message);
  281. } catch (err) {
  282. console.error(err);
  283. }
  284. else if (message.startsWith('"') && message.endsWith('"'))
  285. message = message.substring(1).substring(0, message.length - 2);
  286. return subs[channel].cbs.forEach(cb => cb(message));
  287. });
  288. subs[payload.channel].client.subscribe(payload.channel);
  289. }
  290. subs[payload.channel].cbs.push(payload.cb);
  291. resolve();
  292. });
  293. }
  294. /**
  295. * Gets a full list from Redis
  296. *
  297. * @param {object} payload - object containing payload
  298. * @param {string} payload.key - name of the table to get the value from (table === redis hash)
  299. * @returns {Promise} - returns a promise (resolve, reject)
  300. */
  301. LRANGE(payload) {
  302. return new Promise((resolve, reject) => {
  303. let { key } = payload;
  304. if (!key) {
  305. reject(new Error("Invalid key!"));
  306. return;
  307. }
  308. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  309. CacheModule.client.LRANGE(key, 0, -1, (err, list) => {
  310. if (err) {
  311. reject(new Error(err));
  312. return;
  313. }
  314. resolve(list);
  315. });
  316. });
  317. }
  318. /**
  319. * Adds a value to a list in Redis
  320. *
  321. * @param {object} payload - object containing payload
  322. * @param {string} payload.key - name of the list
  323. * @param {*} payload.value - the value we want to set
  324. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  325. * @returns {Promise} - returns a promise (resolve, reject)
  326. */
  327. RPUSH(payload) {
  328. return new Promise((resolve, reject) => {
  329. let { key } = payload;
  330. let { value } = payload;
  331. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  332. // automatically stringify objects and arrays into JSON
  333. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  334. CacheModule.client.RPUSH(key, value, err => {
  335. if (err) return reject(new Error(err));
  336. return resolve();
  337. });
  338. });
  339. }
  340. /**
  341. * Removes a value from a list in Redis
  342. *
  343. * @param {object} payload - object containing payload
  344. * @param {string} payload.key - name of the list
  345. * @param {*} payload.value - the value we want to remove
  346. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  347. * @returns {Promise} - returns a promise (resolve, reject)
  348. */
  349. LREM(payload) {
  350. return new Promise((resolve, reject) => {
  351. let { key } = payload;
  352. let { value } = payload;
  353. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  354. // automatically stringify objects and arrays into JSON
  355. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  356. CacheModule.client.LREM(key, 1, value, err => {
  357. if (err) return reject(new Error(err));
  358. return resolve();
  359. });
  360. });
  361. }
  362. /**
  363. * Gets a list of keys in Redis with a matching pattern
  364. *
  365. * @param {object} payload - object containing payload
  366. * @param {string} payload.pattern - pattern to search for
  367. * @returns {Promise} - returns a promise (resolve, reject)
  368. */
  369. KEYS(payload) {
  370. return new Promise((resolve, reject) => {
  371. const { pattern } = payload;
  372. CacheModule.client.KEYS(pattern, (err, keys) => {
  373. if (err) return reject(new Error(err));
  374. return resolve(keys);
  375. });
  376. });
  377. }
  378. /**
  379. * Returns a redis schema
  380. *
  381. * @param {object} payload - object containing the payload
  382. * @param {string} payload.schemaName - the name of the schema to get
  383. * @returns {Promise} - returns promise (reject, resolve)
  384. */
  385. GET_SCHEMA(payload) {
  386. return new Promise(resolve => {
  387. resolve(CacheModule.schemas[payload.schemaName]);
  388. });
  389. }
  390. }
  391. export default new _CacheModule();