notifications.js 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import config from "config";
  2. import crypto from "crypto";
  3. import redis from "redis";
  4. import CoreClass from "../core";
  5. import utils from "./utils";
  6. class NotificationsModule extends CoreClass {
  7. constructor() {
  8. super("notifications");
  9. this.subscriptions = [];
  10. }
  11. initialize() {
  12. return new Promise((resolve, reject) => {
  13. const url = (this.url = config.get("redis").url);
  14. const password = (this.password = config.get("redis").password);
  15. this.pub = redis.createClient({
  16. url,
  17. password,
  18. retry_strategy: options => {
  19. if (this.getStatus() === "LOCKDOWN") return;
  20. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  21. this.log("INFO", `Attempting to reconnect.`);
  22. if (options.attempt >= 10) {
  23. this.log("ERROR", `Stopped trying to reconnect.`);
  24. this.setStatus("FAILED");
  25. // this.failed = true;
  26. // this._lockdown();
  27. }
  28. }
  29. });
  30. this.sub = redis.createClient({
  31. url,
  32. password,
  33. retry_strategy: options => {
  34. if (this.getStatus() === "LOCKDOWN") return;
  35. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  36. this.log("INFO", `Attempting to reconnect.`);
  37. if (options.attempt >= 10) {
  38. this.log("ERROR", `Stopped trying to reconnect.`);
  39. this.setStatus("FAILED");
  40. // this.failed = true;
  41. // this._lockdown();
  42. }
  43. }
  44. });
  45. this.sub.on("error", err => {
  46. if (this.getStatus() === "INITIALIZING") reject(err);
  47. if (this.getStatus() === "LOCKDOWN") return;
  48. this.log("ERROR", `Error ${err.message}.`);
  49. });
  50. this.pub.on("error", err => {
  51. if (this.getStatus() === "INITIALIZING") reject(err);
  52. if (this.getStatus() === "LOCKDOWN") return;
  53. this.log("ERROR", `Error ${err.message}.`);
  54. });
  55. this.sub.on("connect", () => {
  56. this.log("INFO", "Sub connected succesfully.");
  57. if (this.getStatus() === "INITIALIZING") resolve();
  58. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  59. this.setStatus("READY");
  60. });
  61. this.pub.on("connect", () => {
  62. this.log("INFO", "Pub connected succesfully.");
  63. this.pub.config("GET", "notify-keyspace-events", async (err, response) => {
  64. if (err) {
  65. const formattedErr = await utils.runJob("GET_ERROR", {
  66. error: err
  67. });
  68. this.log(
  69. "ERROR",
  70. "NOTIFICATIONS_INITIALIZE",
  71. `Getting notify-keyspace-events gave an error. ${formattedErr}`
  72. );
  73. this.log(
  74. "STATION_ISSUE",
  75. `Getting notify-keyspace-events gave an error. ${formattedErr}. ${response}`
  76. );
  77. return;
  78. }
  79. if (response[1] === "xE") {
  80. this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
  81. this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
  82. } else {
  83. this.log(
  84. "ERROR",
  85. "NOTIFICATIONS_INITIALIZE",
  86. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  87. );
  88. this.log(
  89. "STATION_ISSUE",
  90. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  91. );
  92. }
  93. });
  94. if (this.getStatus() === "INITIALIZING") resolve();
  95. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  96. this.setStatus("INITIALIZED");
  97. });
  98. this.sub.on("pmessage", (pattern, channel, expiredKey) => {
  99. this.log(
  100. "STATION_ISSUE",
  101. `PMESSAGE1 - Pattern: ${pattern}; Channel: ${channel}; ExpiredKey: ${expiredKey}`
  102. );
  103. this.subscriptions.forEach(sub => {
  104. this.log(
  105. "STATION_ISSUE",
  106. `PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(sub.name !== expiredKey)}`
  107. );
  108. if (sub.name !== expiredKey) return;
  109. sub.cb();
  110. });
  111. });
  112. this.sub.psubscribe(`__keyevent@${this.pub.options.db}__:expired`);
  113. });
  114. }
  115. /**
  116. * Schedules a notification to be dispatched in a specific amount of milliseconds,
  117. * notifications are unique by name, and the first one is always kept, as in
  118. * attempting to schedule a notification that already exists won't do anything
  119. *
  120. * @param {object} payload - object containing the payload
  121. * @param {string} payload.name - the name of the notification we want to schedule
  122. * @param {number} payload.time - how long in milliseconds until the notification should be fired
  123. * @returns {Promise} - returns a promise (resolve, reject)
  124. */
  125. SCHEDULE(payload) {
  126. // name, time, cb, station
  127. return new Promise((resolve, reject) => {
  128. const time = Math.round(payload.time);
  129. this.log(
  130. "STATION_ISSUE",
  131. `SCHEDULE - Time: ${time}; Name: ${payload.name}; Key: ${crypto
  132. .createHash("md5")
  133. .update(`_notification:${payload.name}_`)
  134. .digest("hex")}; StationId: ${payload.station._id}; StationName: ${payload.station.name}`
  135. );
  136. this.pub.set(
  137. crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  138. "",
  139. "PX",
  140. time,
  141. "NX",
  142. err => {
  143. if (err) reject(err);
  144. else resolve();
  145. }
  146. );
  147. });
  148. }
  149. /**
  150. * Subscribes a callback function to be called when a notification gets called
  151. *
  152. * @param {object} payload - object containing the payload
  153. * @param {string} payload.name - the name of the notification we want to subscribe to
  154. * @param {boolean} payload.unique - only subscribe if another subscription with the same name doesn't already exist
  155. * @returns {Promise} - returns a promise (resolve, reject)
  156. */
  157. SUBSCRIBE(payload) {
  158. // name, cb, unique = false, station
  159. return new Promise(resolve => {
  160. this.log(
  161. "STATION_ISSUE",
  162. `SUBSCRIBE - Name: ${payload.name}; Key: ${crypto
  163. .createHash("md5")
  164. .update(`_notification:${payload.name}_`)
  165. .digest("hex")}, StationId: ${payload.station._id}; StationName: ${payload.station.name}; Unique: ${
  166. payload.unique
  167. }; SubscriptionExists: ${!!this.subscriptions.find(
  168. subscription => subscription.originalName === payload.name
  169. )};`
  170. );
  171. if (payload.unique && !!this.subscriptions.find(subscription => subscription.originalName === payload.name))
  172. return resolve({
  173. subscription: this.subscriptions.find(subscription => subscription.originalName === payload.name)
  174. });
  175. const subscription = {
  176. originalName: payload.name,
  177. name: crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  178. cb: payload.cb
  179. };
  180. this.subscriptions.push(subscription);
  181. return resolve({ subscription });
  182. });
  183. }
  184. /**
  185. * Remove a notification subscription
  186. *
  187. * @param {object} payload - object containing the payload
  188. * @param {object} payload.subscription - the subscription object returned by {@link subscribe}
  189. * @returns {Promise} - returns a promise (resolve, reject)
  190. */
  191. REMOVE(payload) {
  192. // subscription
  193. return new Promise(resolve => {
  194. const index = this.subscriptions.indexOf(payload.subscription);
  195. if (index) this.subscriptions.splice(index, 1);
  196. resolve();
  197. });
  198. }
  199. UNSCHEDULE(payload) {
  200. // name
  201. return new Promise((resolve, reject) => {
  202. this.log(
  203. "STATION_ISSUE",
  204. `UNSCHEDULE - Name: ${payload.name}; Key: ${crypto
  205. .createHash("md5")
  206. .update(`_notification:${payload.name}_`)
  207. .digest("hex")}`
  208. );
  209. this.pub.del(crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"), err => {
  210. if (err) reject(err);
  211. else resolve();
  212. });
  213. });
  214. }
  215. }
  216. export default new NotificationsModule();