tasks.js 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. 'use strict';
  2. const cache = require("./cache");
  3. const logger = require("./logger");
  4. const Stations = require("./stations");
  5. const notifications = require("./notifications");
  6. const async = require("async");
  7. let utils;
  8. let tasks = {};
  9. let testTask = (callback) => {
  10. //Stuff
  11. console.log("Starting task");
  12. setTimeout(() => {
  13. console.log("Callback");
  14. callback();
  15. }, 10000);
  16. };
  17. let checkStationSkipTask = (callback) => {
  18. logger.info("TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  19. async.waterfall([
  20. (next) => {
  21. cache.hgetall('stations', next);
  22. },
  23. (stations, next) => {
  24. async.each(stations, (station, next2) => {
  25. if (station.paused || !station.currentSong || !station.currentSong.title) return next2();
  26. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  27. if (timeElapsed <= station.currentSong.duration) return next2();
  28. else {
  29. logger.error("TASK_STATIONS_SKIP_CHECK", `Skipping ${station._id} as it should have skipped already.`);
  30. Stations.initializeStation(station._id);
  31. next2();
  32. }
  33. }, () => {
  34. next();
  35. });
  36. }
  37. ], () => {
  38. callback();
  39. });
  40. };
  41. let sessionClearingTask = (callback) => {
  42. logger.info("TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`, false);
  43. async.waterfall([
  44. (next) => {
  45. cache.hgetall('sessions', next);
  46. },
  47. (sessions, next) => {
  48. if (!sessions) return next();
  49. let keys = Object.keys(sessions);
  50. async.each(keys, (sessionId, next2) => {
  51. let session = sessions[sessionId];
  52. if (session && session.refreshDate && (Date.now() - session.refreshDate) < (60 * 60 * 24 * 30 * 1000)) return next2();
  53. if (!session) {
  54. logger.info("TASK_SESSION_CLEAR", 'Removing an empty session.');
  55. cache.hdel('sessions', sessionId, () => {
  56. next2();
  57. });
  58. } else if (!session.refreshDate) {
  59. session.refreshDate = Date.now();
  60. cache.hset('sessions', sessionId, session, () => {
  61. next2();
  62. });
  63. } else if ((Date.now() - session.refreshDate) > (60 * 60 * 24 * 30 * 1000)) {
  64. utils.socketsFromSessionId(session.sessionId, (sockets) => {
  65. if (sockets.length > 0) {
  66. session.refreshDate = Date.now();
  67. cache.hset('sessions', sessionId, session, () => {
  68. next2()
  69. });
  70. } else {
  71. logger.info("TASK_SESSION_CLEAR", `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`);
  72. cache.hdel('sessions', session.sessionId, () => {
  73. next2();
  74. });
  75. }
  76. });
  77. } else {
  78. logger.error("TASK_SESSION_CLEAR", "This should never log.");
  79. next2();
  80. }
  81. }, () => {
  82. next();
  83. });
  84. }
  85. ], () => {
  86. callback();
  87. });
  88. };
  89. let initialized = false;
  90. let lockdown = false;
  91. module.exports = {
  92. init: function(cb) {
  93. utils = require('./utils');
  94. this.createTask("testTask", testTask, 5000, true);
  95. this.createTask("stationSkipTask", checkStationSkipTask, 1000 * 60 * 30);
  96. this.createTask("sessionClearTask", sessionClearingTask, 1000 * 60 * 60 * 6);
  97. initialized = true;
  98. if (lockdown) return this._lockdown();
  99. cb();
  100. },
  101. createTask: function(name, fn, timeout, paused = false) {
  102. if (lockdown) return;
  103. tasks[name] = {
  104. name,
  105. fn,
  106. timeout,
  107. lastRan: 0,
  108. timer: null
  109. };
  110. if (!paused) this.handleTask(tasks[name]);
  111. },
  112. pauseTask: (name) => {
  113. if (tasks[name].timer) tasks[name].timer.pause();
  114. },
  115. resumeTask: (name) => {
  116. tasks[name].timer.resume();
  117. },
  118. handleTask: function(task) {
  119. if (lockdown) return;
  120. if (task.timer) task.timer.pause();
  121. task.fn(() => {
  122. task.lastRan = Date.now();
  123. task.timer = new utils.Timer(() => {
  124. this.handleTask(task);
  125. }, task.timeout, false);
  126. });
  127. },
  128. _lockdown: function() {
  129. for (let key in tasks) {
  130. this.pauseTask(key);
  131. }
  132. tasks = {};
  133. lockdown = true;
  134. }
  135. };