notifications.js 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. import config from "config";
  2. import crypto from "crypto";
  3. import redis from "redis";
  4. import CoreClass from "../core";
  5. let NotificationsModule;
  6. let UtilsModule;
  7. class _NotificationsModule extends CoreClass {
  8. // eslint-disable-next-line require-jsdoc
  9. constructor() {
  10. super("notifications");
  11. this.subscriptions = [];
  12. NotificationsModule = this;
  13. }
  14. /**
  15. * Initialises the notifications module
  16. *
  17. * @returns {Promise} - returns promise (reject, resolve)
  18. */
  19. initialize() {
  20. return new Promise((resolve, reject) => {
  21. const url = (this.url = config.get("redis").url);
  22. const password = (this.password = config.get("redis").password);
  23. UtilsModule = this.moduleManager.modules.utils;
  24. this.pub = redis.createClient({
  25. url,
  26. password,
  27. retry_strategy: options => {
  28. if (this.getStatus() === "LOCKDOWN") return;
  29. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  30. this.log("INFO", `Attempting to reconnect.`);
  31. if (options.attempt >= 10) {
  32. this.log("ERROR", `Stopped trying to reconnect.`);
  33. this.setStatus("FAILED");
  34. // this.failed = true;
  35. // this._lockdown();
  36. }
  37. }
  38. });
  39. this.sub = redis.createClient({
  40. url,
  41. password,
  42. retry_strategy: options => {
  43. if (this.getStatus() === "LOCKDOWN") return;
  44. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  45. this.log("INFO", `Attempting to reconnect.`);
  46. if (options.attempt >= 10) {
  47. this.log("ERROR", `Stopped trying to reconnect.`);
  48. this.setStatus("FAILED");
  49. // this.failed = true;
  50. // this._lockdown();
  51. }
  52. }
  53. });
  54. this.sub.on("error", err => {
  55. if (this.getStatus() === "INITIALIZING") reject(err);
  56. if (this.getStatus() === "LOCKDOWN") return;
  57. this.log("ERROR", `Error ${err.message}.`);
  58. });
  59. this.pub.on("error", err => {
  60. if (this.getStatus() === "INITIALIZING") reject(err);
  61. if (this.getStatus() === "LOCKDOWN") return;
  62. this.log("ERROR", `Error ${err.message}.`);
  63. });
  64. this.sub.on("connect", () => {
  65. this.log("INFO", "Sub connected succesfully.");
  66. if (this.getStatus() === "INITIALIZING") resolve();
  67. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  68. this.setStatus("READY");
  69. });
  70. this.pub.on("connect", () => {
  71. this.log("INFO", "Pub connected succesfully.");
  72. this.pub.config("GET", "notify-keyspace-events", async (err, response) => {
  73. if (err) {
  74. const formattedErr = await UtilsModule.runJob(
  75. "GET_ERROR",
  76. {
  77. error: err
  78. },
  79. this
  80. );
  81. this.log(
  82. "ERROR",
  83. "NOTIFICATIONS_INITIALIZE",
  84. `Getting notify-keyspace-events gave an error. ${formattedErr}`
  85. );
  86. this.log(
  87. "STATION_ISSUE",
  88. `Getting notify-keyspace-events gave an error. ${formattedErr}. ${response}`
  89. );
  90. return;
  91. }
  92. if (response[1] === "xE") {
  93. this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
  94. this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
  95. } else {
  96. this.log(
  97. "ERROR",
  98. "NOTIFICATIONS_INITIALIZE",
  99. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  100. );
  101. this.log(
  102. "STATION_ISSUE",
  103. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  104. );
  105. }
  106. });
  107. if (this.getStatus() === "INITIALIZING") resolve();
  108. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  109. this.setStatus("INITIALIZED");
  110. });
  111. this.sub.on("pmessage", (pattern, channel, expiredKey) => {
  112. this.log(
  113. "STATION_ISSUE",
  114. `PMESSAGE1 - Pattern: ${pattern}; Channel: ${channel}; ExpiredKey: ${expiredKey}`
  115. );
  116. this.subscriptions.forEach(sub => {
  117. this.log(
  118. "STATION_ISSUE",
  119. `PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(sub.name !== expiredKey)}`
  120. );
  121. if (sub.name !== expiredKey) return;
  122. sub.cb();
  123. });
  124. });
  125. this.sub.psubscribe(`__keyevent@${this.pub.options.db}__:expired`);
  126. });
  127. }
  128. /**
  129. * Schedules a notification to be dispatched in a specific amount of milliseconds,
  130. * notifications are unique by name, and the first one is always kept, as in
  131. * attempting to schedule a notification that already exists won't do anything
  132. *
  133. * @param {object} payload - object containing the payload
  134. * @param {string} payload.name - the name of the notification we want to schedule
  135. * @param {number} payload.time - how long in milliseconds until the notification should be fired
  136. * @param {object} payload.station - the station object related to the notification
  137. * @returns {Promise} - returns a promise (resolve, reject)
  138. */
  139. SCHEDULE(payload) {
  140. return new Promise((resolve, reject) => {
  141. const time = Math.round(payload.time);
  142. NotificationsModule.log(
  143. "STATION_ISSUE",
  144. `SCHEDULE - Time: ${time}; Name: ${payload.name}; Key: ${crypto
  145. .createHash("md5")
  146. .update(`_notification:${payload.name}_`)
  147. .digest("hex")}; StationId: ${payload.station._id}; StationName: ${payload.station.name}`
  148. );
  149. NotificationsModule.pub.set(
  150. crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  151. "",
  152. "PX",
  153. time,
  154. "NX",
  155. err => {
  156. if (err) reject(err);
  157. else resolve();
  158. }
  159. );
  160. });
  161. }
  162. /**
  163. * Subscribes a callback function to be called when a notification gets called
  164. *
  165. * @param {object} payload - object containing the payload
  166. * @param {string} payload.name - the name of the notification we want to subscribe to
  167. * @param {boolean} payload.unique - only subscribe if another subscription with the same name doesn't already exist
  168. * @param {object} payload.station - the station object related to the notification
  169. * @returns {Promise} - returns a promise (resolve, reject)
  170. */
  171. SUBSCRIBE(payload) {
  172. return new Promise(resolve => {
  173. NotificationsModule.log(
  174. "STATION_ISSUE",
  175. `SUBSCRIBE - Name: ${payload.name}; Key: ${crypto
  176. .createHash("md5")
  177. .update(`_notification:${payload.name}_`)
  178. .digest("hex")}, StationId: ${payload.station._id}; StationName: ${payload.station.name}; Unique: ${
  179. payload.unique
  180. }; SubscriptionExists: ${!!NotificationsModule.subscriptions.find(
  181. subscription => subscription.originalName === payload.name
  182. )};`
  183. );
  184. if (
  185. payload.unique &&
  186. !!NotificationsModule.subscriptions.find(subscription => subscription.originalName === payload.name)
  187. )
  188. return resolve({
  189. subscription: NotificationsModule.subscriptions.find(
  190. subscription => subscription.originalName === payload.name
  191. )
  192. });
  193. const subscription = {
  194. originalName: payload.name,
  195. name: crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  196. cb: payload.cb
  197. };
  198. NotificationsModule.subscriptions.push(subscription);
  199. return resolve({ subscription });
  200. });
  201. }
  202. /**
  203. * Remove a notification subscription
  204. *
  205. * @param {object} payload - object containing the payload
  206. * @param {object} payload.subscription - the subscription object returned by {@link subscribe}
  207. * @returns {Promise} - returns a promise (resolve, reject)
  208. */
  209. REMOVE(payload) {
  210. // subscription
  211. return new Promise(resolve => {
  212. const index = NotificationsModule.subscriptions.indexOf(payload.subscription);
  213. if (index) NotificationsModule.subscriptions.splice(index, 1);
  214. resolve();
  215. });
  216. }
  217. /**
  218. * Unschedules a notification by name (each notification has a unique name)
  219. *
  220. * @param {object} payload - object containing the payload
  221. * @param {string} payload.name - the name of the notification we want to schedule
  222. * @returns {Promise} - returns a promise (resolve, reject)
  223. */
  224. UNSCHEDULE(payload) {
  225. // name
  226. return new Promise((resolve, reject) => {
  227. NotificationsModule.log(
  228. "STATION_ISSUE",
  229. `UNSCHEDULE - Name: ${payload.name}; Key: ${crypto
  230. .createHash("md5")
  231. .update(`_notification:${payload.name}_`)
  232. .digest("hex")}`
  233. );
  234. NotificationsModule.pub.del(
  235. crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  236. err => {
  237. if (err) reject(err);
  238. else resolve();
  239. }
  240. );
  241. });
  242. }
  243. }
  244. export default new _NotificationsModule();