notifications.js 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. import config from "config";
  2. import crypto from "crypto";
  3. import redis from "redis";
  4. import CoreClass from "../core";
  5. let NotificationsModule;
  6. class _NotificationsModule extends CoreClass {
  7. // eslint-disable-next-line require-jsdoc
  8. constructor() {
  9. super("notifications");
  10. this.subscriptions = [];
  11. NotificationsModule = this;
  12. }
  13. /**
  14. * Initialises the notifications module
  15. *
  16. * @returns {Promise} - returns promise (reject, resolve)
  17. */
  18. initialize() {
  19. return new Promise((resolve, reject) => {
  20. const url = (this.url = config.get("redis").url);
  21. const password = (this.password = config.get("redis").password);
  22. this.pub = redis.createClient({
  23. url,
  24. password,
  25. reconnectStrategy: retries => {
  26. if (this.getStatus() !== "LOCKDOWN") {
  27. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  28. this.log("INFO", `Attempting to reconnect.`);
  29. if (retries >= 10) {
  30. this.log("ERROR", `Stopped trying to reconnect.`);
  31. this.setStatus("FAILED");
  32. new Error("Stopped trying to reconnect.");
  33. } else {
  34. Math.min(retries * 50, 500);
  35. }
  36. }
  37. }
  38. });
  39. this.pub.connect().then(async () => {
  40. this.log("INFO", "Pub connected succesfully.");
  41. this.pub
  42. .sendCommand(["CONFIG", "GET", "notify-keyspace-events"])
  43. .then(response => {
  44. if (response[1] === "xE") {
  45. this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
  46. this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
  47. } else {
  48. this.log(
  49. "ERROR",
  50. "NOTIFICATIONS_INITIALIZE",
  51. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  52. );
  53. this.log(
  54. "STATION_ISSUE",
  55. `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`
  56. );
  57. }
  58. })
  59. .catch(err => {
  60. this.log(
  61. "ERROR",
  62. "NOTIFICATIONS_INITIALIZE",
  63. `Getting notify-keyspace-events gave an error. ${err}`
  64. );
  65. this.log("STATION_ISSUE", `Getting notify-keyspace-events gave an error. ${err}.`);
  66. });
  67. if (this.getStatus() === "INITIALIZING") resolve();
  68. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  69. this.setStatus("INITIALIZED");
  70. });
  71. this.sub = redis.createClient({
  72. url,
  73. password,
  74. reconnectStrategy: retries => {
  75. if (this.getStatus() !== "LOCKDOWN") {
  76. if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
  77. this.log("INFO", `Attempting to reconnect.`);
  78. if (retries >= 10) {
  79. this.log("ERROR", `Stopped trying to reconnect.`);
  80. this.setStatus("FAILED");
  81. new Error("Stopped trying to reconnect.");
  82. } else {
  83. Math.min(retries * 50, 500);
  84. }
  85. }
  86. }
  87. });
  88. this.sub.connect().then(async () => {
  89. this.log("INFO", "Sub connected succesfully.");
  90. if (this.getStatus() === "INITIALIZING") resolve();
  91. else if (this.getStatus() === "LOCKDOWN" || this.getStatus() === "RECONNECTING")
  92. this.setStatus("READY");
  93. this.sub.PSUBSCRIBE(`__keyevent@${this.pub.options.db}__:expired`, (message, channel) => {
  94. this.log("STATION_ISSUE", `PMESSAGE1 - Channel: ${channel}; ExpiredKey: ${message}`);
  95. this.subscriptions.forEach(sub => {
  96. this.log(
  97. "STATION_ISSUE",
  98. `PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(sub.name !== message)}`
  99. );
  100. if (sub.name !== message) return;
  101. sub.cb();
  102. });
  103. });
  104. });
  105. this.sub.on("error", err => {
  106. if (this.getStatus() === "INITIALIZING") reject(err);
  107. if (this.getStatus() === "LOCKDOWN") return;
  108. this.log("ERROR", `Error ${err.message}.`);
  109. });
  110. this.pub.on("error", err => {
  111. if (this.getStatus() === "INITIALIZING") reject(err);
  112. if (this.getStatus() === "LOCKDOWN") return;
  113. this.log("ERROR", `Error ${err.message}.`);
  114. });
  115. });
  116. }
  117. /**
  118. * Schedules a notification to be dispatched in a specific amount of milliseconds,
  119. * notifications are unique by name, and the first one is always kept, as in
  120. * attempting to schedule a notification that already exists won't do anything
  121. *
  122. * @param {object} payload - object containing the payload
  123. * @param {string} payload.name - the name of the notification we want to schedule
  124. * @param {number} payload.time - how long in milliseconds until the notification should be fired
  125. * @param {object} payload.station - the station object related to the notification
  126. * @returns {Promise} - returns a promise (resolve, reject)
  127. */
  128. SCHEDULE(payload) {
  129. return new Promise((resolve, reject) => {
  130. const time = Math.round(payload.time);
  131. if (time <= 0) reject(new Error("Time has to be higher than 0"));
  132. else {
  133. NotificationsModule.log(
  134. "STATION_ISSUE",
  135. `SCHEDULE - Time: ${time}; Name: ${payload.name}; Key: ${crypto
  136. .createHash("md5")
  137. .update(`_notification:${payload.name}_`)
  138. .digest("hex")}; StationId: ${payload.station._id}; StationName: ${payload.station.name}`
  139. );
  140. NotificationsModule.pub
  141. .SET(
  142. crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  143. "",
  144. "PX",
  145. time,
  146. "NX"
  147. )
  148. .then(() => resolve())
  149. .catch(err => reject(new Error(err)));
  150. }
  151. });
  152. }
  153. /**
  154. * Subscribes a callback function to be called when a notification gets called
  155. *
  156. * @param {object} payload - object containing the payload
  157. * @param {string} payload.name - the name of the notification we want to subscribe to
  158. * @param {boolean} payload.unique - only subscribe if another subscription with the same name doesn't already exist
  159. * @param {object} payload.station - the station object related to the notification
  160. * @returns {Promise} - returns a promise (resolve, reject)
  161. */
  162. SUBSCRIBE(payload) {
  163. return new Promise(resolve => {
  164. NotificationsModule.log(
  165. "STATION_ISSUE",
  166. `SUBSCRIBE - Name: ${payload.name}; Key: ${crypto
  167. .createHash("md5")
  168. .update(`_notification:${payload.name}_`)
  169. .digest("hex")}, StationId: ${payload.station._id}; StationName: ${payload.station.name}; Unique: ${
  170. payload.unique
  171. }; SubscriptionExists: ${!!NotificationsModule.subscriptions.find(
  172. subscription => subscription.originalName === payload.name
  173. )};`
  174. );
  175. if (
  176. payload.unique &&
  177. !!NotificationsModule.subscriptions.find(subscription => subscription.originalName === payload.name)
  178. ) {
  179. resolve({
  180. subscription: NotificationsModule.subscriptions.find(
  181. subscription => subscription.originalName === payload.name
  182. )
  183. });
  184. return;
  185. }
  186. const subscription = {
  187. originalName: payload.name,
  188. name: crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"),
  189. cb: payload.cb
  190. };
  191. NotificationsModule.subscriptions.push(subscription);
  192. resolve({ subscription });
  193. });
  194. }
  195. /**
  196. * Remove a notification subscription
  197. *
  198. * @param {object} payload - object containing the payload
  199. * @param {object} payload.subscription - the subscription object returned by {@link subscribe}
  200. * @returns {Promise} - returns a promise (resolve, reject)
  201. */
  202. REMOVE(payload) {
  203. // subscription
  204. return new Promise(resolve => {
  205. const index = NotificationsModule.subscriptions.indexOf(payload.subscription);
  206. if (index) NotificationsModule.subscriptions.splice(index, 1);
  207. resolve();
  208. });
  209. }
  210. /**
  211. * Unschedules a notification by name (each notification has a unique name)
  212. *
  213. * @param {object} payload - object containing the payload
  214. * @param {string} payload.name - the name of the notification we want to schedule
  215. * @returns {Promise} - returns a promise (resolve, reject)
  216. */
  217. UNSCHEDULE(payload) {
  218. // name
  219. return new Promise((resolve, reject) => {
  220. NotificationsModule.log(
  221. "STATION_ISSUE",
  222. `UNSCHEDULE - Name: ${payload.name}; Key: ${crypto
  223. .createHash("md5")
  224. .update(`_notification:${payload.name}_`)
  225. .digest("hex")}`
  226. );
  227. NotificationsModule.pub
  228. .DEL(crypto.createHash("md5").update(`_notification:${payload.name}_`).digest("hex"))
  229. .then(() => resolve())
  230. .catch(err => reject(new Error(err)));
  231. });
  232. }
  233. }
  234. export default new _NotificationsModule();