tasks.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. 'use strict';
  2. const cache = require("./cache");
  3. const logger = require("./logger");
  4. const Stations = require("./stations");
  5. const async = require("async");
  6. let utils;
  7. let tasks = {};
  8. let testTask = (callback) => {
  9. //Stuff
  10. console.log("Starting task");
  11. setTimeout(() => {
  12. console.log("Callback");
  13. callback();
  14. }, 10000);
  15. };
  16. let checkStationSkipTask = (callback) => {
  17. logger.info("TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  18. async.waterfall([
  19. (next) => {
  20. cache.hgetall('stations', next);
  21. },
  22. (stations, next) => {
  23. async.each(stations, (station, next2) => {
  24. if (station.paused || !station.currentSong || !station.currentSong.title) return next2();
  25. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  26. if (timeElapsed <= station.currentSong.duration) return next2();
  27. else {
  28. logger.error("TASK_STATIONS_SKIP_CHECK", `Skipping ${station._id} as it should have skipped already.`);
  29. Stations.skipStation(station._id);
  30. next2();
  31. }
  32. }, () => {
  33. next();
  34. });
  35. }
  36. ], () => {
  37. callback();
  38. });
  39. };
  40. let sessionClearingTask = (callback) => {
  41. logger.info("TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`, false);
  42. async.waterfall([
  43. (next) => {
  44. cache.hgetall('sessions', next);
  45. },
  46. (sessions, next) => {
  47. if (!sessions) return next();
  48. let keys = Object.keys(sessions);
  49. async.each(keys, (sessionId, next2) => {
  50. let session = sessions[sessionId];
  51. if (session && session.refreshDate && (Date.now() - session.refreshDate) < (60 * 60 * 24 * 30 * 1000)) return next2();
  52. if (!session) {
  53. logger.info("TASK_SESSION_CLEAR", 'Removing an empty session.');
  54. cache.hdel('sessions', sessionId, () => {
  55. next2();
  56. });
  57. } else if (!session.refreshDate) {
  58. session.refreshDate = Date.now();
  59. cache.hset('sessions', sessionId, session, () => {
  60. next2();
  61. });
  62. } else if ((Date.now() - session.refreshDate) > (60 * 60 * 24 * 30 * 1000)) {
  63. utils.socketsFromSessionId(session.sessionId, (sockets) => {
  64. if (sockets.length > 0) {
  65. session.refreshDate = Date.now();
  66. cache.hset('sessions', sessionId, session, () => {
  67. next2()
  68. });
  69. } else {
  70. logger.info("TASK_SESSION_CLEAR", `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`);
  71. cache.hdel('sessions', session.sessionId, () => {
  72. next2();
  73. });
  74. }
  75. });
  76. } else {
  77. logger.error("TASK_SESSION_CLEAR", "This should never log.");
  78. next2();
  79. }
  80. }, () => {
  81. next();
  82. });
  83. }
  84. ], () => {
  85. callback();
  86. });
  87. };
  88. let initialized = false;
  89. let lockdown = false;
  90. module.exports = {
  91. init: function(cb) {
  92. utils = require('./utils');
  93. this.createTask("testTask", testTask, 5000, true);
  94. this.createTask("stationSkipTask", checkStationSkipTask, 1000 * 60 * 30);
  95. this.createTask("sessionClearTask", sessionClearingTask, 1000 * 60 * 60 * 6);
  96. initialized = true;
  97. if (lockdown) return this._lockdown();
  98. cb();
  99. },
  100. createTask: function(name, fn, timeout, paused = false) {
  101. if (lockdown) return;
  102. tasks[name] = {
  103. name,
  104. fn,
  105. timeout,
  106. lastRan: 0,
  107. timer: null
  108. };
  109. if (!paused) this.handleTask(tasks[name]);
  110. },
  111. pauseTask: (name) => {
  112. if (tasks[name].timer) tasks[name].timer.pause();
  113. },
  114. resumeTask: (name) => {
  115. tasks[name].timer.resume();
  116. },
  117. handleTask: function(task) {
  118. if (lockdown) return;
  119. if (task.timer) task.timer.pause();
  120. task.fn(() => {
  121. task.lastRan = Date.now();
  122. task.timer = new utils.Timer(() => {
  123. this.handleTask(task);
  124. }, task.timeout, false);
  125. });
  126. },
  127. _lockdown: function() {
  128. for (let key in tasks) {
  129. this.pauseTask(key);
  130. }
  131. tasks = {};
  132. lockdown = true;
  133. }
  134. };