notifications.js 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. const CoreClass = require("../core.js");
  2. const crypto = require("crypto");
  3. const redis = require("redis");
  4. const config = require("config");
  5. const utils = require("./utils");
  6. const subscriptions = [];
  7. class NotificationsModule extends CoreClass {
  8. constructor() {
  9. super("notifications");
  10. }
  11. initialize() {
  12. return new Promise((resolve, reject) => {
  13. const url = (this.url = config.get("redis").url);
  14. const password = (this.password = config.get("redis").password);
  15. this.pub = redis.createClient({
  16. url,
  17. password,
  18. retry_strategy: (options) => {
  19. if (this.getStatus() === "LOCKDOWN") return;
  20. if (this.getStatus() !== "RECONNECTING")
  21. this.setStatus("RECONNECTING");
  22. this.log("INFO", `Attempting to reconnect.`);
  23. if (options.attempt >= 10) {
  24. this.log("ERROR", `Stopped trying to reconnect.`);
  25. this.setStatus("FAILED");
  26. // this.failed = true;
  27. // this._lockdown();
  28. return undefined;
  29. }
  30. return 3000;
  31. },
  32. });
  33. this.sub = redis.createClient({
  34. url,
  35. password,
  36. retry_strategy: (options) => {
  37. if (this.getStatus() === "LOCKDOWN") return;
  38. if (this.getStatus() !== "RECONNECTING")
  39. this.setStatus("RECONNECTING");
  40. this.log("INFO", `Attempting to reconnect.`);
  41. if (options.attempt >= 10) {
  42. this.log("ERROR", `Stopped trying to reconnect.`);
  43. this.setStatus("FAILED");
  44. // this.failed = true;
  45. // this._lockdown();
  46. return undefined;
  47. }
  48. return 3000;
  49. },
  50. });
  51. this.sub.on("error", (err) => {
  52. if (this.getStatus() === "INITIALIZING") reject(err);
  53. if (this.getStatus() === "LOCKDOWN") return;
  54. this.log("ERROR", `Error ${err.message}.`);
  55. });
  56. this.pub.on("error", (err) => {
  57. if (this.getStatus() === "INITIALIZING") reject(err);
  58. if (this.getStatus() === "LOCKDOWN") return;
  59. this.log("ERROR", `Error ${err.message}.`);
  60. });
  61. this.sub.on("connect", () => {
  62. this.log("INFO", "Sub connected succesfully.");
  63. if (this.getStatus() === "INITIALIZING") resolve();
  64. else if (
  65. this.getStatus() === "LOCKDOWN" ||
  66. this.getStatus() === "RECONNECTING"
  67. )
  68. this.setStatus("READY");
  69. });
  70. this.pub.on("connect", () => {
  71. this.log("INFO", "Pub connected succesfully.");
  72. this.pub.config("GET", "notify-keyspace-events", async (err, response) => {
  73. if (err) {
  74. err = await utils.runJob("GET_ERROR", { error: err });
  75. this.log("ERROR", "NOTIFICATIONS_INITIALIZE", `Getting notify-keyspace-events gave an error. ${err}`);
  76. this.log(
  77. "STATION_ISSUE",
  78. `Getting notify-keyspace-events gave an error. ${err}. ${response}`
  79. );
  80. return;
  81. }
  82. if (response[1] === "xE") {
  83. this.log("INFO", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is set correctly`);
  84. this.log("STATION_ISSUE", `notify-keyspace-events is set correctly`);
  85. } else {
  86. this.log("ERROR", "NOTIFICATIONS_INITIALIZE", `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`);
  87. this.log("STATION_ISSUE", `notify-keyspace-events is NOT correctly! It is set to: ${response[1]}`);
  88. }
  89. });
  90. if (this.getStatus() === "INITIALIZING") resolve();
  91. else if (
  92. this.getStatus() === "LOCKDOWN" ||
  93. this.getStatus() === "RECONNECTING"
  94. )
  95. this.setStatus("INITIALIZED");
  96. });
  97. this.sub.on("pmessage", (pattern, channel, expiredKey) => {
  98. this.log(
  99. "STATION_ISSUE",
  100. `PMESSAGE1 - Pattern: ${pattern}; Channel: ${channel}; ExpiredKey: ${expiredKey}`
  101. );
  102. subscriptions.forEach((sub) => {
  103. this.log(
  104. "STATION_ISSUE",
  105. `PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(
  106. sub.name !== expiredKey
  107. )}`
  108. );
  109. if (sub.name !== expiredKey) return;
  110. sub.cb();
  111. });
  112. });
  113. this.sub.psubscribe(`__keyevent@${this.pub.options.db}__:expired`);
  114. });
  115. }
  116. /**
  117. * Schedules a notification to be dispatched in a specific amount of milliseconds,
  118. * notifications are unique by name, and the first one is always kept, as in
  119. * attempting to schedule a notification that already exists won't do anything
  120. *
  121. * @param {String} name - the name of the notification we want to schedule
  122. * @param {Integer} time - how long in milliseconds until the notification should be fired
  123. * @param {Function} cb - gets called when the notification has been scheduled
  124. */
  125. SCHEDULE(payload) {
  126. //name, time, cb, station
  127. return new Promise((resolve, reject) => {
  128. const time = Math.round(payload.time);
  129. this.log(
  130. "STATION_ISSUE",
  131. `SCHEDULE - Time: ${time}; Name: ${payload.name}; Key: ${crypto
  132. .createHash("md5")
  133. .update(`_notification:${payload.name}_`)
  134. .digest("hex")}; StationId: ${
  135. payload.station._id
  136. }; StationName: ${payload.station.name}`
  137. );
  138. this.pub.set(
  139. crypto
  140. .createHash("md5")
  141. .update(`_notification:${payload.name}_`)
  142. .digest("hex"),
  143. "",
  144. "PX",
  145. time,
  146. "NX",
  147. (err) => {
  148. if (err) reject(err);
  149. else resolve();
  150. }
  151. );
  152. });
  153. }
  154. /**
  155. * Subscribes a callback function to be called when a notification gets called
  156. *
  157. * @param {String} name - the name of the notification we want to subscribe to
  158. * @param {Function} cb - gets called when the subscribed notification gets called
  159. * @param {Boolean} unique - only subscribe if another subscription with the same name doesn't already exist
  160. * @return {Object} - the subscription object
  161. */
  162. SUBSCRIBE(payload) {
  163. //name, cb, unique = false, station
  164. return new Promise((resolve, reject) => {
  165. this.log(
  166. "STATION_ISSUE",
  167. `SUBSCRIBE - Name: ${payload.name}; Key: ${crypto
  168. .createHash("md5")
  169. .update(`_notification:${payload.name}_`)
  170. .digest("hex")}, StationId: ${
  171. payload.station._id
  172. }; StationName: ${payload.station.name}; Unique: ${
  173. payload.unique
  174. }; SubscriptionExists: ${!!subscriptions.find(
  175. (subscription) => subscription.originalName === payload.name
  176. )};`
  177. );
  178. if (
  179. payload.unique &&
  180. !!subscriptions.find(
  181. (subscription) => subscription.originalName === payload.name
  182. )
  183. )
  184. return resolve({
  185. subscription: subscriptions.find(
  186. (subscription) =>
  187. subscription.originalName === payload.name
  188. ),
  189. });
  190. let subscription = {
  191. originalName: payload.name,
  192. name: crypto
  193. .createHash("md5")
  194. .update(`_notification:${payload.name}_`)
  195. .digest("hex"),
  196. cb: payload.cb,
  197. };
  198. subscriptions.push(subscription);
  199. resolve({ subscription });
  200. });
  201. }
  202. /**
  203. * Remove a notification subscription
  204. *
  205. * @param {Object} subscription - the subscription object returned by {@link subscribe}
  206. */
  207. REMOVE(payload) {
  208. //subscription
  209. return new Promise((resolve, reject) => {
  210. let index = subscriptions.indexOf(payload.subscription);
  211. if (index) subscriptions.splice(index, 1);
  212. resolve();
  213. });
  214. }
  215. UNSCHEDULE(payload) {
  216. //name
  217. return new Promise((resolve, reject) => {
  218. this.log(
  219. "STATION_ISSUE",
  220. `UNSCHEDULE - Name: ${payload.name}; Key: ${crypto
  221. .createHash("md5")
  222. .update(`_notification:${payload.name}_`)
  223. .digest("hex")}`
  224. );
  225. this.pub.del(
  226. crypto
  227. .createHash("md5")
  228. .update(`_notification:${payload.name}_`)
  229. .digest("hex"),
  230. (err) => {
  231. if (err) reject(err);
  232. else resolve();
  233. }
  234. );
  235. });
  236. }
  237. }
  238. module.exports = new NotificationsModule();