notifications.js 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. const CoreClass = require("../core.js");
  2. const crypto = require("crypto");
  3. const redis = require("redis");
  4. const config = require("config");
  5. const subscriptions = [];
  6. class NotificationsModule extends CoreClass {
  7. constructor() {
  8. super("notifications");
  9. }
  10. initialize() {
  11. return new Promise((resolve, reject) => {
  12. const url = (this.url = config.get("redis").url);
  13. const password = (this.password = config.get("redis").password);
  14. this.pub = redis.createClient({
  15. url,
  16. password,
  17. retry_strategy: (options) => {
  18. if (this.getStatus() === "LOCKDOWN") return;
  19. if (this.getStatus() !== "RECONNECTING")
  20. this.setStatus("RECONNECTING");
  21. this.log("INFO", `Attempting to reconnect.`);
  22. if (options.attempt >= 10) {
  23. this.log("ERROR", `Stopped trying to reconnect.`);
  24. this.setStatus("FAILED");
  25. // this.failed = true;
  26. // this._lockdown();
  27. return undefined;
  28. }
  29. return 3000;
  30. },
  31. });
  32. this.sub = redis.createClient({
  33. url,
  34. password,
  35. retry_strategy: (options) => {
  36. if (this.getStatus() === "LOCKDOWN") return;
  37. if (this.getStatus() !== "RECONNECTING")
  38. this.setStatus("RECONNECTING");
  39. this.log("INFO", `Attempting to reconnect.`);
  40. if (options.attempt >= 10) {
  41. this.log("ERROR", `Stopped trying to reconnect.`);
  42. this.setStatus("FAILED");
  43. // this.failed = true;
  44. // this._lockdown();
  45. return undefined;
  46. }
  47. return 3000;
  48. },
  49. });
  50. this.sub.on("error", (err) => {
  51. if (this.getStatus() === "INITIALIZING") reject(err);
  52. if (this.getStatus() === "LOCKDOWN") return;
  53. this.log("ERROR", `Error ${err.message}.`);
  54. });
  55. this.pub.on("error", (err) => {
  56. if (this.getStatus() === "INITIALIZING") reject(err);
  57. if (this.getStatus() === "LOCKDOWN") return;
  58. this.log("ERROR", `Error ${err.message}.`);
  59. });
  60. this.sub.on("connect", () => {
  61. this.log("INFO", "Sub connected succesfully.");
  62. if (this.getStatus() === "INITIALIZING") resolve();
  63. else if (
  64. this.getStatus() === "LOCKDOWN" ||
  65. this.getStatus() === "RECONNECTING"
  66. )
  67. this.setStatus("READY");
  68. });
  69. this.pub.on("connect", () => {
  70. this.log("INFO", "Pub connected succesfully.");
  71. if (this.getStatus() === "INITIALIZING") resolve();
  72. else if (
  73. this.getStatus() === "LOCKDOWN" ||
  74. this.getStatus() === "RECONNECTING"
  75. )
  76. this.setStatus("INITIALIZED");
  77. });
  78. this.sub.on("pmessage", (pattern, channel, expiredKey) => {
  79. this.log(
  80. "STATION_ISSUE",
  81. `PMESSAGE1 - Pattern: ${pattern}; Channel: ${channel}; ExpiredKey: ${expiredKey}`
  82. );
  83. subscriptions.forEach((sub) => {
  84. this.log(
  85. "STATION_ISSUE",
  86. `PMESSAGE2 - Sub name: ${sub.name}; Calls cb: ${!(
  87. sub.name !== expiredKey
  88. )}`
  89. );
  90. if (sub.name !== expiredKey) return;
  91. sub.cb();
  92. });
  93. });
  94. this.sub.psubscribe("__keyevent@0__:expired");
  95. });
  96. }
  97. /**
  98. * Schedules a notification to be dispatched in a specific amount of milliseconds,
  99. * notifications are unique by name, and the first one is always kept, as in
  100. * attempting to schedule a notification that already exists won't do anything
  101. *
  102. * @param {String} name - the name of the notification we want to schedule
  103. * @param {Integer} time - how long in milliseconds until the notification should be fired
  104. * @param {Function} cb - gets called when the notification has been scheduled
  105. */
  106. SCHEDULE(payload) {
  107. //name, time, cb, station
  108. return new Promise((resolve, reject) => {
  109. const time = Math.round(payload.time);
  110. this.log(
  111. "STATION_ISSUE",
  112. `SCHEDULE - Time: ${time}; Name: ${payload.name}; Key: ${crypto
  113. .createHash("md5")
  114. .update(`_notification:${payload.name}_`)
  115. .digest("hex")}; StationId: ${
  116. payload.station._id
  117. }; StationName: ${payload.station.name}`
  118. );
  119. this.pub.set(
  120. crypto
  121. .createHash("md5")
  122. .update(`_notification:${payload.name}_`)
  123. .digest("hex"),
  124. "",
  125. "PX",
  126. time,
  127. "NX",
  128. () => {
  129. resolve();
  130. }
  131. );
  132. });
  133. }
  134. /**
  135. * Subscribes a callback function to be called when a notification gets called
  136. *
  137. * @param {String} name - the name of the notification we want to subscribe to
  138. * @param {Function} cb - gets called when the subscribed notification gets called
  139. * @param {Boolean} unique - only subscribe if another subscription with the same name doesn't already exist
  140. * @return {Object} - the subscription object
  141. */
  142. SUBSCRIBE(payload) {
  143. //name, cb, unique = false, station
  144. return new Promise((resolve, reject) => {
  145. this.log(
  146. "STATION_ISSUE",
  147. `SUBSCRIBE - Name: ${payload.name}; Key: ${crypto
  148. .createHash("md5")
  149. .update(`_notification:${payload.name}_`)
  150. .digest("hex")}, StationId: ${
  151. payload.station._id
  152. }; StationName: ${payload.station.name}; Unique: ${
  153. payload.unique
  154. }; SubscriptionExists: ${!!subscriptions.find(
  155. (subscription) => subscription.originalName === payload.name
  156. )};`
  157. );
  158. if (
  159. payload.unique &&
  160. !!subscriptions.find(
  161. (subscription) => subscription.originalName === payload.name
  162. )
  163. )
  164. return resolve({
  165. subscription: subscriptions.find(
  166. (subscription) =>
  167. subscription.originalName === payload.name
  168. ),
  169. });
  170. let subscription = {
  171. originalName: payload.name,
  172. name: crypto
  173. .createHash("md5")
  174. .update(`_notification:${payload.name}_`)
  175. .digest("hex"),
  176. cb: payload.cb,
  177. };
  178. subscriptions.push(subscription);
  179. resolve({ subscription });
  180. });
  181. }
  182. /**
  183. * Remove a notification subscription
  184. *
  185. * @param {Object} subscription - the subscription object returned by {@link subscribe}
  186. */
  187. REMOVE(payload) {
  188. //subscription
  189. return new Promise((resolve, reject) => {
  190. let index = subscriptions.indexOf(payload.subscription);
  191. if (index) subscriptions.splice(index, 1);
  192. resolve();
  193. });
  194. }
  195. UNSCHEDULE(payload) {
  196. //name
  197. return new Promise((resolve, reject) => {
  198. this.log(
  199. "STATION_ISSUE",
  200. `UNSCHEDULE - Name: ${payload.name}; Key: ${crypto
  201. .createHash("md5")
  202. .update(`_notification:${payload.name}_`)
  203. .digest("hex")}`
  204. );
  205. this.pub.del(
  206. crypto
  207. .createHash("md5")
  208. .update(`_notification:${payload.name}_`)
  209. .digest("hex")
  210. );
  211. resolve();
  212. });
  213. }
  214. }
  215. module.exports = new NotificationsModule();