notifications.js 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import config from "config";
  2. import crypto from "crypto";
  3. import redis from "redis";
  4. import CoreClass from "../core";
  5. let NotificationsModule;
  6. class _NotificationsModule extends CoreClass {
  7. // eslint-disable-next-line require-jsdoc
  8. constructor() {
  9. super("notifications");
  10. this.subscriptions = [];
  11. NotificationsModule = this;
  12. }
  13. /**
  14. * Initialises the notifications module
  15. *
  16. * @returns {Promise} - returns promise (reject, resolve)
  17. */
  18. initialize() {
  19. return new Promise((resolve, reject) => {
  20. this.pub = redis.createClient({
  21. ...config.get("redis"),
  22. reconnectStrategy: retries => {
  23. if (this.getStatus() !== "LOCKDOWN") {
  24. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  25. this.log("INFO", `Attempting to reconnect.`);
  26. if (retries >= 10) {
  27. this.log("ERROR", `Stopped trying to reconnect.`);
  28. this.setStatus("FAILED");
  29. new Error("Stopped trying to reconnect.");
  30. } else {
  31. Math.min(retries * 50, 500);
  32. }
  33. }
  34. }
  35. });
  36. this.pub.on("error", err => {
  37. if (this.getStatus() === "INITIALIZING") reject(err);
  38. if (this.getStatus() === "LOCKDOWN") return;
  39. this.log("ERROR", `Error ${err.message}.`);
  40. });
  41. this.pub.on("ready", () => {
  42. this.log("INFO", "Pub is ready.");
  43. if (this.getStatus() === "INITIALIZING") resolve();
  44. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  45. this.setStatus("INITIALIZED");
  46. });
  47. this.pub.connect().then(async () => {
  48. this.log("INFO", "Pub connected succesfully.");
  49. this.pub
  50. .sendCommand(["CONFIG", "GET", "notify-keyspace-events"])
  51. .then(response => {
  52. if (response[1] === "xE") {
  53. this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
  54. this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
  55. } else {
  56. this.log(
  57. "ERROR",
  58. "NOTIFICATIONS_INITIALIZE",
  59. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  60. );
  61. this.log(
  62. "STATION_ISSUE",
  63. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  64. );
  65. }
  66. })
  67. .catch(err => {
  68. this.log(
  69. "ERROR",
  70. "NOTIFICATIONS_INITIALIZE",
  71. `Getting notify-keyspace-events gave an error. ${err}`
  72. );
  73. this.log("STATION_ISSUE", `Getting notify-keyspace-events gave an error. ${err}.`);
  74. });
  75. });
  76. this.sub = this.pub.duplicate();
  77. this.sub.on("error", err => {
  78. if (this.getStatus() === "INITIALIZING") reject(err);
  79. if (this.getStatus() === "LOCKDOWN") return;
  80. this.log("ERROR", `Error ${err.message}.`);
  81. });
  82. this.sub.connect().then(async () => {
  83. this.log("INFO", "Sub connected succesfully.");
  84. if (this.getStatus() === "INITIALIZING") resolve();
  85. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  86. this.setStatus("READY");
  87. this.sub.PSUBSCRIBE(`__keyevent@${this.sub.options.database}__:expired`, (message, channel) => {
  88. this.log("STATION_ISSUE", `PMESSAGE1 - Channel: ${channel}; ExpiredKey: ${message}`);
  89. this.subscriptions.forEach(sub => {
  90. this.log(
  91. "STATION_ISSUE",
  92. `PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(sub.name !== message)}`
  93. );
  94. if (sub.name !== message) return;
  95. sub.cb();
  96. });
  97. });
  98. });
  99. });
  100. }
  101. /**
  102. * Schedules a notification to be dispatched in a specific amount of milliseconds,
  103. * notifications are unique by name, and the first one is always kept, as in
  104. * attempting to schedule a notification that already exists won't do anything
  105. *
  106. * @param {object} payload - object containing the payload
  107. * @param {string} payload.name - the name of the notification we want to schedule
  108. * @param {number} payload.time - how long in milliseconds until the notification should be fired
  109. * @param {object} payload.station - the station object related to the notification
  110. * @returns {Promise} - returns a promise (resolve, reject)
  111. */
  112. SCHEDULE(payload) {
  113. return new Promise((resolve, reject) => {
  114. const time = Math.round(payload.time);
  115. if (time <= 0) reject(new Error("Time has to be higher than 0"));
  116. else {
  117. NotificationsModule.log(
  118. "STATION_ISSUE",
  119. `SCHEDULE - Time: ${time}; Name: ${payload.name}; Key: ${crypto
  120. .createHash("md5")
  121. .update(`_notification:${payload.name}_`)
  122. .digest("hex")}; StationId: ${payload.station._id}; StationName: ${payload.station.name}`
  123. );
  124. NotificationsModule.pub
  125. .SET(crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"), "", {
  126. PX: time,
  127. NX: true
  128. })
  129. .then(() => resolve())
  130. .catch(err => reject(new Error(err)));
  131. }
  132. });
  133. }
  134. /**
  135. * Subscribes a callback function to be called when a notification gets called
  136. *
  137. * @param {object} payload - object containing the payload
  138. * @param {string} payload.name - the name of the notification we want to subscribe to
  139. * @param {boolean} payload.unique - only subscribe if another subscription with the same name doesn't already exist
  140. * @param {object} payload.station - the station object related to the notification
  141. * @returns {Promise} - returns a promise (resolve, reject)
  142. */
  143. SUBSCRIBE(payload) {
  144. return new Promise(resolve => {
  145. NotificationsModule.log(
  146. "STATION_ISSUE",
  147. `SUBSCRIBE - Name: ${payload.name}; Key: ${crypto
  148. .createHash("md5")
  149. .update(`_notification:${payload.name}_`)
  150. .digest("hex")}, StationId: ${payload.station._id}; StationName: ${payload.station.name}; Unique: ${
  151. payload.unique
  152. }; SubscriptionExists: ${!!NotificationsModule.subscriptions.find(
  153. subscription => subscription.originalName === payload.name
  154. )};`
  155. );
  156. if (
  157. payload.unique &&
  158. !!NotificationsModule.subscriptions.find(subscription => subscription.originalName === payload.name)
  159. ) {
  160. resolve({
  161. subscription: NotificationsModule.subscriptions.find(
  162. subscription => subscription.originalName === payload.name
  163. )
  164. });
  165. return;
  166. }
  167. const subscription = {
  168. originalName: payload.name,
  169. name: crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  170. cb: payload.cb
  171. };
  172. NotificationsModule.subscriptions.push(subscription);
  173. resolve({ subscription });
  174. });
  175. }
  176. /**
  177. * Remove a notification subscription
  178. *
  179. * @param {object} payload - object containing the payload
  180. * @param {object} payload.subscription - the subscription object returned by {@link subscribe}
  181. * @returns {Promise} - returns a promise (resolve, reject)
  182. */
  183. REMOVE(payload) {
  184. // subscription
  185. return new Promise(resolve => {
  186. const index = NotificationsModule.subscriptions.indexOf(payload.subscription);
  187. if (index) NotificationsModule.subscriptions.splice(index, 1);
  188. resolve();
  189. });
  190. }
  191. /**
  192. * Unschedules a notification by name (each notification has a unique name)
  193. *
  194. * @param {object} payload - object containing the payload
  195. * @param {string} payload.name - the name of the notification we want to schedule
  196. * @returns {Promise} - returns a promise (resolve, reject)
  197. */
  198. UNSCHEDULE(payload) {
  199. // name
  200. return new Promise((resolve, reject) => {
  201. NotificationsModule.log(
  202. "STATION_ISSUE",
  203. `UNSCHEDULE - Name: ${payload.name}; Key: ${crypto
  204. .createHash("md5")
  205. .update(`_notification:${payload.name}_`)
  206. .digest("hex")}`
  207. );
  208. NotificationsModule.pub
  209. .DEL(crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"))
  210. .then(() => resolve())
  211. .catch(err => reject(new Error(err)));
  212. });
  213. }
  214. }
  215. export default new _NotificationsModule();