notifications.js 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. import config from "config";
  2. import crypto from "crypto";
  3. import redis from "redis";
  4. import CoreClass from "../core";
  5. import utils from "./utils";
  6. class NotificationsModule extends CoreClass {
  7. // eslint-disable-next-line require-jsdoc
  8. constructor() {
  9. super("notifications");
  10. this.subscriptions = [];
  11. }
  12. /**
  13. * Initialises the notifications module
  14. *
  15. * @returns {Promise} - returns promise (reject, resolve)
  16. */
  17. initialize() {
  18. return new Promise((resolve, reject) => {
  19. const url = (this.url = config.get("redis").url);
  20. const password = (this.password = config.get("redis").password);
  21. this.pub = redis.createClient({
  22. url,
  23. password,
  24. retry_strategy: options => {
  25. if (this.getStatus() === "LOCKDOWN") return;
  26. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  27. this.log("INFO", `Attempting to reconnect.`);
  28. if (options.attempt >= 10) {
  29. this.log("ERROR", `Stopped trying to reconnect.`);
  30. this.setStatus("FAILED");
  31. // this.failed = true;
  32. // this._lockdown();
  33. }
  34. }
  35. });
  36. this.sub = redis.createClient({
  37. url,
  38. password,
  39. retry_strategy: options => {
  40. if (this.getStatus() === "LOCKDOWN") return;
  41. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  42. this.log("INFO", `Attempting to reconnect.`);
  43. if (options.attempt >= 10) {
  44. this.log("ERROR", `Stopped trying to reconnect.`);
  45. this.setStatus("FAILED");
  46. // this.failed = true;
  47. // this._lockdown();
  48. }
  49. }
  50. });
  51. this.sub.on("error", err => {
  52. if (this.getStatus() === "INITIALIZING") reject(err);
  53. if (this.getStatus() === "LOCKDOWN") return;
  54. this.log("ERROR", `Error ${err.message}.`);
  55. });
  56. this.pub.on("error", err => {
  57. if (this.getStatus() === "INITIALIZING") reject(err);
  58. if (this.getStatus() === "LOCKDOWN") return;
  59. this.log("ERROR", `Error ${err.message}.`);
  60. });
  61. this.sub.on("connect", () => {
  62. this.log("INFO", "Sub connected succesfully.");
  63. if (this.getStatus() === "INITIALIZING") resolve();
  64. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  65. this.setStatus("READY");
  66. });
  67. this.pub.on("connect", () => {
  68. this.log("INFO", "Pub connected succesfully.");
  69. this.pub.config("GET", "notify-keyspace-events", async (err, response) => {
  70. if (err) {
  71. const formattedErr = await utils.runJob("GET_ERROR", {
  72. error: err
  73. });
  74. this.log(
  75. "ERROR",
  76. "NOTIFICATIONS_INITIALIZE",
  77. `Getting notify-keyspace-events gave an error. ${formattedErr}`
  78. );
  79. this.log(
  80. "STATION_ISSUE",
  81. `Getting notify-keyspace-events gave an error. ${formattedErr}. ${response}`
  82. );
  83. return;
  84. }
  85. if (response[1] === "xE") {
  86. this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
  87. this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
  88. } else {
  89. this.log(
  90. "ERROR",
  91. "NOTIFICATIONS_INITIALIZE",
  92. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  93. );
  94. this.log(
  95. "STATION_ISSUE",
  96. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  97. );
  98. }
  99. });
  100. if (this.getStatus() === "INITIALIZING") resolve();
  101. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  102. this.setStatus("INITIALIZED");
  103. });
  104. this.sub.on("pmessage", (pattern, channel, expiredKey) => {
  105. this.log(
  106. "STATION_ISSUE",
  107. `PMESSAGE1 - Pattern: ${pattern}; Channel: ${channel}; ExpiredKey: ${expiredKey}`
  108. );
  109. this.subscriptions.forEach(sub => {
  110. this.log(
  111. "STATION_ISSUE",
  112. `PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(sub.name !== expiredKey)}`
  113. );
  114. if (sub.name !== expiredKey) return;
  115. sub.cb();
  116. });
  117. });
  118. this.sub.psubscribe(`__keyevent@${this.pub.options.db}__:expired`);
  119. });
  120. }
  121. /**
  122. * Schedules a notification to be dispatched in a specific amount of milliseconds,
  123. * notifications are unique by name, and the first one is always kept, as in
  124. * attempting to schedule a notification that already exists won't do anything
  125. *
  126. * @param {object} payload - object containing the payload
  127. * @param {string} payload.name - the name of the notification we want to schedule
  128. * @param {number} payload.time - how long in milliseconds until the notification should be fired
  129. * @param {object} payload.station - the station object related to the notification
  130. * @returns {Promise} - returns a promise (resolve, reject)
  131. */
  132. SCHEDULE(payload) {
  133. return new Promise((resolve, reject) => {
  134. const time = Math.round(payload.time);
  135. this.log(
  136. "STATION_ISSUE",
  137. `SCHEDULE - Time: ${time}; Name: ${payload.name}; Key: ${crypto
  138. .createHash("md5")
  139. .update(`_notification:${payload.name}_`)
  140. .digest("hex")}; StationId: ${payload.station._id}; StationName: ${payload.station.name}`
  141. );
  142. this.pub.set(
  143. crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  144. "",
  145. "PX",
  146. time,
  147. "NX",
  148. err => {
  149. if (err) reject(err);
  150. else resolve();
  151. }
  152. );
  153. });
  154. }
  155. /**
  156. * Subscribes a callback function to be called when a notification gets called
  157. *
  158. * @param {object} payload - object containing the payload
  159. * @param {string} payload.name - the name of the notification we want to subscribe to
  160. * @param {boolean} payload.unique - only subscribe if another subscription with the same name doesn't already exist
  161. * @param {object} payload.station - the station object related to the notification
  162. * @returns {Promise} - returns a promise (resolve, reject)
  163. */
  164. SUBSCRIBE(payload) {
  165. return new Promise(resolve => {
  166. this.log(
  167. "STATION_ISSUE",
  168. `SUBSCRIBE - Name: ${payload.name}; Key: ${crypto
  169. .createHash("md5")
  170. .update(`_notification:${payload.name}_`)
  171. .digest("hex")}, StationId: ${payload.station._id}; StationName: ${payload.station.name}; Unique: ${
  172. payload.unique
  173. }; SubscriptionExists: ${!!this.subscriptions.find(
  174. subscription => subscription.originalName === payload.name
  175. )};`
  176. );
  177. if (payload.unique && !!this.subscriptions.find(subscription => subscription.originalName === payload.name))
  178. return resolve({
  179. subscription: this.subscriptions.find(subscription => subscription.originalName === payload.name)
  180. });
  181. const subscription = {
  182. originalName: payload.name,
  183. name: crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  184. cb: payload.cb
  185. };
  186. this.subscriptions.push(subscription);
  187. return resolve({ subscription });
  188. });
  189. }
  190. /**
  191. * Remove a notification subscription
  192. *
  193. * @param {object} payload - object containing the payload
  194. * @param {object} payload.subscription - the subscription object returned by {@link subscribe}
  195. * @returns {Promise} - returns a promise (resolve, reject)
  196. */
  197. REMOVE(payload) {
  198. // subscription
  199. return new Promise(resolve => {
  200. const index = this.subscriptions.indexOf(payload.subscription);
  201. if (index) this.subscriptions.splice(index, 1);
  202. resolve();
  203. });
  204. }
  205. /**
  206. * Unschedules a notification by name (each notification has a unique name)
  207. *
  208. * @param {object} payload - object containing the payload
  209. * @param {string} payload.name - the name of the notification we want to schedule
  210. * @returns {Promise} - returns a promise (resolve, reject)
  211. */
  212. UNSCHEDULE(payload) {
  213. // name
  214. return new Promise((resolve, reject) => {
  215. this.log(
  216. "STATION_ISSUE",
  217. `UNSCHEDULE - Name: ${payload.name}; Key: ${crypto
  218. .createHash("md5")
  219. .update(`_notification:${payload.name}_`)
  220. .digest("hex")}`
  221. );
  222. this.pub.del(crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"), err => {
  223. if (err) reject(err);
  224. else resolve();
  225. });
  226. });
  227. }
  228. }
  229. export default new NotificationsModule();