notifications.js 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import config from "config";
  2. import crypto from "crypto";
  3. import redis from "redis";
  4. import CoreClass from "../core";
  5. let NotificationsModule;
  6. let UtilsModule;
  7. class _NotificationsModule extends CoreClass {
  8. // eslint-disable-next-line require-jsdoc
  9. constructor() {
  10. super("notifications");
  11. this.subscriptions = [];
  12. NotificationsModule = this;
  13. }
  14. /**
  15. * Initialises the notifications module
  16. *
  17. * @returns {Promise} - returns promise (reject, resolve)
  18. */
  19. initialize() {
  20. return new Promise((resolve, reject) => {
  21. const url = (this.url = config.get("redis").url);
  22. const password = (this.password = config.get("redis").password);
  23. UtilsModule = this.moduleManager.modules.utils;
  24. this.pub = redis.createClient({
  25. url,
  26. password,
  27. retry_strategy: options => {
  28. if (this.getStatus() === "LOCKDOWN") return;
  29. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  30. this.log("INFO", `Attempting to reconnect.`);
  31. if (options.attempt >= 10) {
  32. this.log("ERROR", `Stopped trying to reconnect.`);
  33. this.setStatus("FAILED");
  34. }
  35. }
  36. });
  37. this.sub = redis.createClient({
  38. url,
  39. password,
  40. retry_strategy: options => {
  41. if (this.getStatus() === "LOCKDOWN") return;
  42. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  43. this.log("INFO", `Attempting to reconnect.`);
  44. if (options.attempt >= 10) {
  45. this.log("ERROR", `Stopped trying to reconnect.`);
  46. this.setStatus("FAILED");
  47. }
  48. }
  49. });
  50. this.sub.on("error", err => {
  51. if (this.getStatus() === "INITIALIZING") reject(err);
  52. if (this.getStatus() === "LOCKDOWN") return;
  53. this.log("ERROR", `Error ${err.message}.`);
  54. });
  55. this.pub.on("error", err => {
  56. if (this.getStatus() === "INITIALIZING") reject(err);
  57. if (this.getStatus() === "LOCKDOWN") return;
  58. this.log("ERROR", `Error ${err.message}.`);
  59. });
  60. this.sub.on("connect", () => {
  61. this.log("INFO", "Sub connected succesfully.");
  62. if (this.getStatus() === "INITIALIZING") resolve();
  63. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  64. this.setStatus("READY");
  65. });
  66. this.pub.on("connect", () => {
  67. this.log("INFO", "Pub connected succesfully.");
  68. this.pub.config("GET", "notify-keyspace-events", async (err, response) => {
  69. if (err) {
  70. const formattedErr = await UtilsModule.runJob(
  71. "GET_ERROR",
  72. {
  73. error: err
  74. },
  75. this
  76. );
  77. this.log(
  78. "ERROR",
  79. "NOTIFICATIONS_INITIALIZE",
  80. `Getting notify-keyspace-events gave an error. ${formattedErr}`
  81. );
  82. this.log(
  83. "STATION_ISSUE",
  84. `Getting notify-keyspace-events gave an error. ${formattedErr}. ${response}`
  85. );
  86. return;
  87. }
  88. if (response[1] === "xE") {
  89. this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
  90. this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
  91. } else {
  92. this.log(
  93. "ERROR",
  94. "NOTIFICATIONS_INITIALIZE",
  95. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  96. );
  97. this.log(
  98. "STATION_ISSUE",
  99. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  100. );
  101. }
  102. });
  103. if (this.getStatus() === "INITIALIZING") resolve();
  104. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  105. this.setStatus("INITIALIZED");
  106. });
  107. this.sub.on("pmessage", (pattern, channel, expiredKey) => {
  108. this.log(
  109. "STATION_ISSUE",
  110. `PMESSAGE1 - Pattern: ${pattern}; Channel: ${channel}; ExpiredKey: ${expiredKey}`
  111. );
  112. this.subscriptions.forEach(sub => {
  113. this.log(
  114. "STATION_ISSUE",
  115. `PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(sub.name !== expiredKey)}`
  116. );
  117. if (sub.name !== expiredKey) return;
  118. sub.cb();
  119. });
  120. });
  121. this.sub.psubscribe(`__keyevent@${this.pub.options.db}__:expired`);
  122. });
  123. }
  124. /**
  125. * Schedules a notification to be dispatched in a specific amount of milliseconds,
  126. * notifications are unique by name, and the first one is always kept, as in
  127. * attempting to schedule a notification that already exists won't do anything
  128. *
  129. * @param {object} payload - object containing the payload
  130. * @param {string} payload.name - the name of the notification we want to schedule
  131. * @param {number} payload.time - how long in milliseconds until the notification should be fired
  132. * @param {object} payload.station - the station object related to the notification
  133. * @returns {Promise} - returns a promise (resolve, reject)
  134. */
  135. SCHEDULE(payload) {
  136. return new Promise((resolve, reject) => {
  137. const time = Math.round(payload.time);
  138. if (time <= 0) reject(new Error("Time has to be higher than 0"));
  139. else {
  140. NotificationsModule.log(
  141. "STATION_ISSUE",
  142. `SCHEDULE - Time: ${time}; Name: ${payload.name}; Key: ${crypto
  143. .createHash("md5")
  144. .update(`_notification:${payload.name}_`)
  145. .digest("hex")}; StationId: ${payload.station._id}; StationName: ${payload.station.name}`
  146. );
  147. NotificationsModule.pub.set(
  148. crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  149. "",
  150. "PX",
  151. time,
  152. "NX",
  153. err => {
  154. if (err) reject(err);
  155. else resolve();
  156. }
  157. );
  158. }
  159. });
  160. }
  161. /**
  162. * Subscribes a callback function to be called when a notification gets called
  163. *
  164. * @param {object} payload - object containing the payload
  165. * @param {string} payload.name - the name of the notification we want to subscribe to
  166. * @param {boolean} payload.unique - only subscribe if another subscription with the same name doesn't already exist
  167. * @param {object} payload.station - the station object related to the notification
  168. * @returns {Promise} - returns a promise (resolve, reject)
  169. */
  170. SUBSCRIBE(payload) {
  171. return new Promise(resolve => {
  172. NotificationsModule.log(
  173. "STATION_ISSUE",
  174. `SUBSCRIBE - Name: ${payload.name}; Key: ${crypto
  175. .createHash("md5")
  176. .update(`_notification:${payload.name}_`)
  177. .digest("hex")}, StationId: ${payload.station._id}; StationName: ${payload.station.name}; Unique: ${
  178. payload.unique
  179. }; SubscriptionExists: ${!!NotificationsModule.subscriptions.find(
  180. subscription => subscription.originalName === payload.name
  181. )};`
  182. );
  183. if (
  184. payload.unique &&
  185. !!NotificationsModule.subscriptions.find(subscription => subscription.originalName === payload.name)
  186. ) {
  187. resolve({
  188. subscription: NotificationsModule.subscriptions.find(
  189. subscription => subscription.originalName === payload.name
  190. )
  191. });
  192. return;
  193. }
  194. const subscription = {
  195. originalName: payload.name,
  196. name: crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  197. cb: payload.cb
  198. };
  199. NotificationsModule.subscriptions.push(subscription);
  200. resolve({ subscription });
  201. });
  202. }
  203. /**
  204. * Remove a notification subscription
  205. *
  206. * @param {object} payload - object containing the payload
  207. * @param {object} payload.subscription - the subscription object returned by {@link subscribe}
  208. * @returns {Promise} - returns a promise (resolve, reject)
  209. */
  210. REMOVE(payload) {
  211. // subscription
  212. return new Promise(resolve => {
  213. const index = NotificationsModule.subscriptions.indexOf(payload.subscription);
  214. if (index) NotificationsModule.subscriptions.splice(index, 1);
  215. resolve();
  216. });
  217. }
  218. /**
  219. * Unschedules a notification by name (each notification has a unique name)
  220. *
  221. * @param {object} payload - object containing the payload
  222. * @param {string} payload.name - the name of the notification we want to schedule
  223. * @returns {Promise} - returns a promise (resolve, reject)
  224. */
  225. UNSCHEDULE(payload) {
  226. // name
  227. return new Promise((resolve, reject) => {
  228. NotificationsModule.log(
  229. "STATION_ISSUE",
  230. `UNSCHEDULE - Name: ${payload.name}; Key: ${crypto
  231. .createHash("md5")
  232. .update(`_notification:${payload.name}_`)
  233. .digest("hex")}`
  234. );
  235. NotificationsModule.pub.del(
  236. crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  237. err => {
  238. if (err) reject(err);
  239. else resolve();
  240. }
  241. );
  242. });
  243. }
  244. }
  245. export default new _NotificationsModule();