notifications.js 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. import config from "config";
  2. import crypto from "crypto";
  3. import redis from "redis";
  4. import CoreClass from "../core";
  5. let NotificationsModule;
  6. let UtilsModule;
  7. class _NotificationsModule extends CoreClass {
  8. // eslint-disable-next-line require-jsdoc
  9. constructor() {
  10. super("notifications");
  11. this.subscriptions = [];
  12. NotificationsModule = this;
  13. }
  14. /**
  15. * Initialises the notifications module
  16. *
  17. * @returns {Promise} - returns promise (reject, resolve)
  18. */
  19. initialize() {
  20. return new Promise((resolve, reject) => {
  21. const url = (this.url = config.get("redis").url);
  22. const password = (this.password = config.get("redis").password);
  23. UtilsModule = this.moduleManager.modules.utils;
  24. this.pub = redis.createClient({
  25. url,
  26. password,
  27. retry_strategy: options => {
  28. if (this.getStatus() === "LOCKDOWN") return;
  29. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  30. this.log("INFO", `Attempting to reconnect.`);
  31. if (options.attempt >= 10) {
  32. this.log("ERROR", `Stopped trying to reconnect.`);
  33. this.setStatus("FAILED");
  34. // this.failed = true;
  35. // this._lockdown();
  36. }
  37. }
  38. });
  39. this.sub = redis.createClient({
  40. url,
  41. password,
  42. retry_strategy: options => {
  43. if (this.getStatus() === "LOCKDOWN") return;
  44. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  45. this.log("INFO", `Attempting to reconnect.`);
  46. if (options.attempt >= 10) {
  47. this.log("ERROR", `Stopped trying to reconnect.`);
  48. this.setStatus("FAILED");
  49. // this.failed = true;
  50. // this._lockdown();
  51. }
  52. }
  53. });
  54. this.sub.on("error", err => {
  55. if (this.getStatus() === "INITIALIZING") reject(err);
  56. if (this.getStatus() === "LOCKDOWN") return;
  57. this.log("ERROR", `Error ${err.message}.`);
  58. });
  59. this.pub.on("error", err => {
  60. if (this.getStatus() === "INITIALIZING") reject(err);
  61. if (this.getStatus() === "LOCKDOWN") return;
  62. this.log("ERROR", `Error ${err.message}.`);
  63. });
  64. this.sub.on("connect", () => {
  65. this.log("INFO", "Sub connected succesfully.");
  66. if (this.getStatus() === "INITIALIZING") resolve();
  67. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  68. this.setStatus("READY");
  69. });
  70. this.pub.on("connect", () => {
  71. this.log("INFO", "Pub connected succesfully.");
  72. this.pub.config("GET", "notify-keyspace-events", async (err, response) => {
  73. if (err) {
  74. const formattedErr = await UtilsModule.runJob(
  75. "GET_ERROR",
  76. {
  77. error: err
  78. },
  79. this
  80. );
  81. this.log(
  82. "ERROR",
  83. "NOTIFICATIONS_INITIALIZE",
  84. `Getting notify-keyspace-events gave an error. ${formattedErr}`
  85. );
  86. this.log(
  87. "STATION_ISSUE",
  88. `Getting notify-keyspace-events gave an error. ${formattedErr}. ${response}`
  89. );
  90. return;
  91. }
  92. if (response[1] === "xE") {
  93. this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
  94. this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
  95. } else {
  96. this.log(
  97. "ERROR",
  98. "NOTIFICATIONS_INITIALIZE",
  99. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  100. );
  101. this.log(
  102. "STATION_ISSUE",
  103. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  104. );
  105. }
  106. });
  107. if (this.getStatus() === "INITIALIZING") resolve();
  108. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  109. this.setStatus("INITIALIZED");
  110. });
  111. this.sub.on("pmessage", (pattern, channel, expiredKey) => {
  112. this.log(
  113. "STATION_ISSUE",
  114. `PMESSAGE1 - Pattern: ${pattern}; Channel: ${channel}; ExpiredKey: ${expiredKey}`
  115. );
  116. this.subscriptions.forEach(sub => {
  117. this.log(
  118. "STATION_ISSUE",
  119. `PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(sub.name !== expiredKey)}`
  120. );
  121. if (sub.name !== expiredKey) return;
  122. sub.cb();
  123. });
  124. });
  125. this.sub.psubscribe(`__keyevent@${this.pub.options.db}__:expired`);
  126. });
  127. }
  128. /**
  129. * Schedules a notification to be dispatched in a specific amount of milliseconds,
  130. * notifications are unique by name, and the first one is always kept, as in
  131. * attempting to schedule a notification that already exists won't do anything
  132. *
  133. * @param {object} payload - object containing the payload
  134. * @param {string} payload.name - the name of the notification we want to schedule
  135. * @param {number} payload.time - how long in milliseconds until the notification should be fired
  136. * @param {object} payload.station - the station object related to the notification
  137. * @returns {Promise} - returns a promise (resolve, reject)
  138. */
  139. SCHEDULE(payload) {
  140. return new Promise((resolve, reject) => {
  141. const time = Math.round(payload.time);
  142. if (time <= 0) reject(new Error("Time has to be higher than 0"));
  143. else {
  144. NotificationsModule.log(
  145. "STATION_ISSUE",
  146. `SCHEDULE - Time: ${time}; Name: ${payload.name}; Key: ${crypto
  147. .createHash("md5")
  148. .update(`_notification:${payload.name}_`)
  149. .digest("hex")}; StationId: ${payload.station._id}; StationName: ${payload.station.name}`
  150. );
  151. NotificationsModule.pub.set(
  152. crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  153. "",
  154. "PX",
  155. time,
  156. "NX",
  157. err => {
  158. if (err) reject(err);
  159. else resolve();
  160. }
  161. );
  162. }
  163. });
  164. }
  165. /**
  166. * Subscribes a callback function to be called when a notification gets called
  167. *
  168. * @param {object} payload - object containing the payload
  169. * @param {string} payload.name - the name of the notification we want to subscribe to
  170. * @param {boolean} payload.unique - only subscribe if another subscription with the same name doesn't already exist
  171. * @param {object} payload.station - the station object related to the notification
  172. * @returns {Promise} - returns a promise (resolve, reject)
  173. */
  174. SUBSCRIBE(payload) {
  175. return new Promise(resolve => {
  176. NotificationsModule.log(
  177. "STATION_ISSUE",
  178. `SUBSCRIBE - Name: ${payload.name}; Key: ${crypto
  179. .createHash("md5")
  180. .update(`_notification:${payload.name}_`)
  181. .digest("hex")}, StationId: ${payload.station._id}; StationName: ${payload.station.name}; Unique: ${
  182. payload.unique
  183. }; SubscriptionExists: ${!!NotificationsModule.subscriptions.find(
  184. subscription => subscription.originalName === payload.name
  185. )};`
  186. );
  187. if (
  188. payload.unique &&
  189. !!NotificationsModule.subscriptions.find(subscription => subscription.originalName === payload.name)
  190. )
  191. return resolve({
  192. subscription: NotificationsModule.subscriptions.find(
  193. subscription => subscription.originalName === payload.name
  194. )
  195. });
  196. const subscription = {
  197. originalName: payload.name,
  198. name: crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  199. cb: payload.cb
  200. };
  201. NotificationsModule.subscriptions.push(subscription);
  202. return resolve({ subscription });
  203. });
  204. }
  205. /**
  206. * Remove a notification subscription
  207. *
  208. * @param {object} payload - object containing the payload
  209. * @param {object} payload.subscription - the subscription object returned by {@link subscribe}
  210. * @returns {Promise} - returns a promise (resolve, reject)
  211. */
  212. REMOVE(payload) {
  213. // subscription
  214. return new Promise(resolve => {
  215. const index = NotificationsModule.subscriptions.indexOf(payload.subscription);
  216. if (index) NotificationsModule.subscriptions.splice(index, 1);
  217. resolve();
  218. });
  219. }
  220. /**
  221. * Unschedules a notification by name (each notification has a unique name)
  222. *
  223. * @param {object} payload - object containing the payload
  224. * @param {string} payload.name - the name of the notification we want to schedule
  225. * @returns {Promise} - returns a promise (resolve, reject)
  226. */
  227. UNSCHEDULE(payload) {
  228. // name
  229. return new Promise((resolve, reject) => {
  230. NotificationsModule.log(
  231. "STATION_ISSUE",
  232. `UNSCHEDULE - Name: ${payload.name}; Key: ${crypto
  233. .createHash("md5")
  234. .update(`_notification:${payload.name}_`)
  235. .digest("hex")}`
  236. );
  237. NotificationsModule.pub.del(
  238. crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  239. err => {
  240. if (err) reject(err);
  241. else resolve();
  242. }
  243. );
  244. });
  245. }
  246. }
  247. export default new _NotificationsModule();