tasks.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. 'use strict';
  2. const coreClass = require("../core");
  3. const async = require("async");
  4. let tasks = {};
  5. module.exports = class extends coreClass {
  6. initialize() {
  7. return new Promise((resolve, reject) => {
  8. this.setStage(1);
  9. this.cache = this.moduleManager.modules["cache"];
  10. this.stations = this.moduleManager.modules["stations"];
  11. this.notifications = this.moduleManager.modules["notifications"];
  12. this.utils = this.moduleManager.modules["utils"];
  13. //this.createTask("testTask", testTask, 5000, true);
  14. this.createTask("stationSkipTask", this.checkStationSkipTask, 1000 * 60 * 30);
  15. this.createTask("sessionClearTask", this.sessionClearingTask, 1000 * 60 * 60 * 6);
  16. resolve();
  17. });
  18. }
  19. async createTask(name, fn, timeout, paused = false) {
  20. try { await this._validateHook(); } catch { return; }
  21. tasks[name] = {
  22. name,
  23. fn,
  24. timeout,
  25. lastRan: 0,
  26. timer: null
  27. };
  28. if (!paused) this.handleTask(tasks[name]);
  29. }
  30. async pauseTask(name) {
  31. try { await this._validateHook(); } catch { return; }
  32. if (tasks[name].timer) tasks[name].timer.pause();
  33. }
  34. async resumeTask(name) {
  35. try { await this._validateHook(); } catch { return; }
  36. tasks[name].timer.resume();
  37. }
  38. async handleTask(task) {
  39. try { await this._validateHook(); } catch { return; }
  40. if (task.timer) task.timer.pause();
  41. task.fn(() => {
  42. task.lastRan = Date.now();
  43. task.timer = new utils.Timer(() => {
  44. this.handleTask(task);
  45. }, task.timeout, false);
  46. });
  47. }
  48. /*testTask(callback) {
  49. //Stuff
  50. console.log("Starting task");
  51. setTimeout(() => {
  52. console.log("Callback");
  53. callback();
  54. }, 10000);
  55. }*/
  56. async checkStationSkipTask(callback) {
  57. try { await this._validateHook(); } catch { return; }
  58. this.logger.info("TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  59. async.waterfall([
  60. (next) => {
  61. this.cache.hgetall('stations', next);
  62. },
  63. (stations, next) => {
  64. async.each(stations, (station, next2) => {
  65. if (station.paused || !station.currentSong || !station.currentSong.title) return next2();
  66. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  67. if (timeElapsed <= station.currentSong.duration) return next2();
  68. else {
  69. this.logger.error("TASK_STATIONS_SKIP_CHECK", `Skipping ${station._id} as it should have skipped already.`);
  70. this.stations.initializeStation(station._id);
  71. next2();
  72. }
  73. }, () => {
  74. next();
  75. });
  76. }
  77. ], () => {
  78. callback();
  79. });
  80. }
  81. async sessionClearingTask(callback) {
  82. try { await this._validateHook(); } catch { return; }
  83. this.logger.info("TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`, false);
  84. async.waterfall([
  85. (next) => {
  86. this.cache.hgetall('sessions', next);
  87. },
  88. (sessions, next) => {
  89. if (!sessions) return next();
  90. let keys = Object.keys(sessions);
  91. async.each(keys, (sessionId, next2) => {
  92. let session = sessions[sessionId];
  93. if (session && session.refreshDate && (Date.now() - session.refreshDate) < (60 * 60 * 24 * 30 * 1000)) return next2();
  94. if (!session) {
  95. this.logger.info("TASK_SESSION_CLEAR", 'Removing an empty session.');
  96. this.cache.hdel('sessions', sessionId, () => {
  97. next2();
  98. });
  99. } else if (!session.refreshDate) {
  100. session.refreshDate = Date.now();
  101. this.cache.hset('sessions', sessionId, session, () => {
  102. next2();
  103. });
  104. } else if ((Date.now() - session.refreshDate) > (60 * 60 * 24 * 30 * 1000)) {
  105. this.utils.socketsFromSessionId(session.sessionId, (sockets) => {
  106. if (sockets.length > 0) {
  107. session.refreshDate = Date.now();
  108. this.cache.hset('sessions', sessionId, session, () => {
  109. next2()
  110. });
  111. } else {
  112. this.logger.info("TASK_SESSION_CLEAR", `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`);
  113. this.cache.hdel('sessions', session.sessionId, () => {
  114. next2();
  115. });
  116. }
  117. });
  118. } else {
  119. this.logger.error("TASK_SESSION_CLEAR", "This should never log.");
  120. next2();
  121. }
  122. }, () => {
  123. next();
  124. });
  125. }
  126. ], () => {
  127. callback();
  128. });
  129. }
  130. }