index.js 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. import config from "config";
  2. import redis from "redis";
  3. import mongoose from "mongoose";
  4. import CoreClass from "../../core";
  5. // Lightweight / convenience wrapper around redis module for our needs
  6. const pubs = {};
  7. const subs = {};
  8. class CacheModule extends CoreClass {
  9. constructor() {
  10. super("cache");
  11. }
  12. async initialize() {
  13. const importSchema = schemaName =>
  14. new Promise(resolve => {
  15. import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
  16. });
  17. this.schemas = {
  18. session: await importSchema("session"),
  19. station: await importSchema("station"),
  20. playlist: await importSchema("playlist"),
  21. officialPlaylist: await importSchema("officialPlaylist"),
  22. song: await importSchema("song"),
  23. punishment: await importSchema("punishment")
  24. };
  25. return new Promise((resolve, reject) => {
  26. this.url = config.get("redis").url;
  27. this.password = config.get("redis").password;
  28. this.log("INFO", "Connecting...");
  29. this.client = redis.createClient({
  30. url: this.url,
  31. password: this.password,
  32. retry_strategy: options => {
  33. if (this.getStatus() === "LOCKDOWN") return;
  34. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  35. this.log("INFO", `Attempting to reconnect.`);
  36. if (options.attempt >= 10) {
  37. this.log("ERROR", `Stopped trying to reconnect.`);
  38. this.setStatus("FAILED");
  39. // this.failed = true;
  40. // this._lockdown();
  41. }
  42. }
  43. });
  44. this.client.on("error", err => {
  45. if (this.getStatus() === "INITIALIZING") reject(err);
  46. if (this.getStatus() === "LOCKDOWN") return;
  47. this.log("ERROR", `Error ${err.message}.`);
  48. });
  49. this.client.on("connect", () => {
  50. this.log("INFO", "Connected succesfully.");
  51. if (this.getStatus() === "INITIALIZING") resolve();
  52. else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
  53. });
  54. });
  55. }
  56. QUIT() {
  57. return new Promise(resolve => {
  58. if (this.client.connected) {
  59. this.client.quit();
  60. Object.keys(pubs).forEach(channel => pubs[channel].quit());
  61. Object.keys(subs).forEach(channel => subs[channel].client.quit());
  62. }
  63. resolve();
  64. });
  65. }
  66. /**
  67. * Sets a single value in a table
  68. *
  69. * @param {object} payload - object containing payload
  70. * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)
  71. * @param {string} payload.key - name of the key to set
  72. * @param {*} payload.value - the value we want to set
  73. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  74. * @returns {Promise} - returns a promise (resolve, reject)
  75. */
  76. HSET(payload) {
  77. // table, key, value, cb, stringifyJson = true
  78. return new Promise((resolve, reject) => {
  79. let { key } = payload;
  80. let { value } = payload;
  81. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  82. // automatically stringify objects and arrays into JSON
  83. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  84. this.client.hset(payload.table, key, value, err => {
  85. if (err) return reject(new Error(err));
  86. return resolve(JSON.parse(value));
  87. });
  88. });
  89. }
  90. /**
  91. * Gets a single value from a table
  92. *
  93. * @param {object} payload - object containing payload
  94. * @param {string} payload.table - name of the table to get the value from (table === redis hash)
  95. * @param {string} payload.key - name of the key to fetch
  96. * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON
  97. * @returns {Promise} - returns a promise (resolve, reject)
  98. */
  99. HGET(payload) {
  100. // table, key, parseJson = true
  101. return new Promise((resolve, reject) => {
  102. let { key } = payload;
  103. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  104. this.client.hget(payload.table, key, (err, value) => {
  105. if (err) return reject(new Error(err));
  106. try {
  107. value = JSON.parse(value);
  108. } catch (e) {
  109. return reject(err);
  110. }
  111. return resolve(value);
  112. });
  113. });
  114. }
  115. /**
  116. * Deletes a single value from a table
  117. *
  118. * @param {object} payload - object containing payload
  119. * @param {string} payload.table - name of the table to delete the value from (table === redis hash)
  120. * @param {string} payload.key - name of the key to delete
  121. * @returns {Promise} - returns a promise (resolve, reject)
  122. */
  123. HDEL(payload) {
  124. // table, key, cb
  125. return new Promise((resolve, reject) => {
  126. // if (!payload.key || !table || typeof key !== "string")
  127. // return cb(null, null);
  128. let { key } = payload;
  129. if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
  130. this.client.hdel(payload.table, key, err => {
  131. if (err) return reject(new Error(err));
  132. return resolve();
  133. });
  134. });
  135. }
  136. /**
  137. * Returns all the keys for a table
  138. *
  139. * @param {object} payload - object containing payload
  140. * @param {string} payload.table - name of the table to get the values from (table === redis hash)
  141. * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default
  142. * @returns {Promise} - returns a promise (resolve, reject)
  143. */
  144. HGETALL(payload) {
  145. // table, cb, parseJson = true
  146. return new Promise((resolve, reject) => {
  147. this.client.hgetall(payload.table, (err, obj) => {
  148. if (err) return reject(new Error(err));
  149. if (obj)
  150. Object.keys(obj).forEach(key => {
  151. obj[key] = JSON.parse(obj[key]);
  152. });
  153. else if (!obj) obj = [];
  154. return resolve(obj);
  155. });
  156. });
  157. }
  158. /**
  159. * Publish a message to a channel, caches the redis client connection
  160. *
  161. * @param {object} payload - object containing payload
  162. * @param {string} payload.channel - the name of the channel we want to publish a message to
  163. * @param {*} payload.value - the value we want to send
  164. * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
  165. * @returns {Promise} - returns a promise (resolve, reject)
  166. */
  167. PUB(payload) {
  168. // channel, value, stringifyJson = true
  169. return new Promise((resolve, reject) => {
  170. /* if (pubs[channel] === undefined) {
  171. pubs[channel] = redis.createClient({ url: this.url });
  172. pubs[channel].on('error', (err) => console.error);
  173. } */
  174. let { value } = payload;
  175. if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
  176. // pubs[channel].publish(channel, value);
  177. this.client.publish(payload.channel, value, err => {
  178. if (err) reject(err);
  179. else resolve();
  180. });
  181. });
  182. }
  183. /**
  184. * Subscribe to a channel, caches the redis client connection
  185. *
  186. * @param {object} payload - object containing payload
  187. * @param {string} payload.channel - name of the channel to subscribe to
  188. * @param {boolean} [payload.parseJson=true] - parse the message as JSON
  189. * @returns {Promise} - returns a promise (resolve, reject)
  190. */
  191. SUB(payload) {
  192. // channel, cb, parseJson = true
  193. return new Promise(resolve => {
  194. if (subs[payload.channel] === undefined) {
  195. subs[payload.channel] = {
  196. client: redis.createClient({
  197. url: this.url,
  198. password: this.password
  199. }),
  200. cbs: []
  201. };
  202. subs[payload.channel].client.on("message", (channel, message) => {
  203. try {
  204. message = JSON.parse(message);
  205. } catch (err) {
  206. console.error(err);
  207. }
  208. return subs[channel].cbs.forEach(cb => cb(message));
  209. });
  210. subs[payload.channel].client.subscribe(payload.channel);
  211. }
  212. subs[payload.channel].cbs.push(payload.cb);
  213. resolve();
  214. });
  215. }
  216. GET_SCHEMA(payload) {
  217. return new Promise(resolve => {
  218. resolve(this.schemas[payload.schemaName]);
  219. });
  220. }
  221. }
  222. export default new CacheModule();