tasks.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. const CoreClass = require("../core.js");
  2. const tasks = {};
  3. const async = require("async");
  4. const fs = require("fs");
  5. const Timer = require("../classes/Timer.class");
  6. class TasksModule extends CoreClass {
  7. constructor() {
  8. super("tasks");
  9. }
  10. initialize() {
  11. return new Promise((resolve, reject) => {
  12. // return reject(new Error("Not fully migrated yet."));
  13. this.cache = this.moduleManager.modules["cache"];
  14. this.stations = this.moduleManager.modules["stations"];
  15. this.notifications = this.moduleManager.modules["notifications"];
  16. this.utils = this.moduleManager.modules["utils"];
  17. //this.createTask("testTask", testTask, 5000, true);
  18. this.runJob("CREATE_TASK", {
  19. name: "stationSkipTask",
  20. fn: this.checkStationSkipTask,
  21. timeout: 1000 * 60 * 30,
  22. });
  23. this.runJob("CREATE_TASK", {
  24. name: "sessionClearTask",
  25. fn: this.sessionClearingTask,
  26. timeout: 1000 * 60 * 60 * 6,
  27. });
  28. this.runJob("CREATE_TASK", {
  29. name: "logFileSizeCheckTask",
  30. fn: this.logFileSizeCheckTask,
  31. timeout: 1000 * 60 * 60,
  32. });
  33. resolve();
  34. });
  35. }
  36. CREATE_TASK(payload) {
  37. return new Promise((resolve, reject) => {
  38. tasks[payload.name] = {
  39. name: payload.name,
  40. fn: payload.fn,
  41. timeout: payload.timeout,
  42. lastRan: 0,
  43. timer: null,
  44. };
  45. if (!payload.paused) {
  46. this.runJob("RUN_TASK", { name: payload.name })
  47. .then(() => resolve())
  48. .catch((err) => reject(err));
  49. } else resolve();
  50. });
  51. }
  52. PAUSE_TASK(payload) {
  53. return new Promise((resolve, reject) => {
  54. if (tasks[payload.name].timer) tasks[name].timer.pause();
  55. resolve();
  56. });
  57. }
  58. RESUME_TASK(payload) {
  59. return new Promise((resolve, reject) => {
  60. tasks[payload.name].timer.resume();
  61. resolve();
  62. });
  63. }
  64. RUN_TASK(payload) {
  65. return new Promise((resolve, reject) => {
  66. const task = tasks[payload.name];
  67. if (task.timer) task.timer.pause();
  68. task.fn.apply(this).then(() => {
  69. task.lastRan = Date.now();
  70. task.timer = new Timer(
  71. () => {
  72. this.runJob("RUN_TASK", { name: payload.name });
  73. },
  74. task.timeout,
  75. false
  76. );
  77. resolve();
  78. });
  79. });
  80. }
  81. checkStationSkipTask(callback) {
  82. return new Promise((resolve, reject) => {
  83. this.log(
  84. "INFO",
  85. "TASK_STATIONS_SKIP_CHECK",
  86. `Checking for stations to be skipped.`,
  87. false
  88. );
  89. async.waterfall(
  90. [
  91. (next) => {
  92. this.cache
  93. .runJob("HGETALL", {
  94. table: "stations",
  95. })
  96. .then((response) => next(null, response))
  97. .catch(next);
  98. },
  99. (stations, next) => {
  100. async.each(
  101. stations,
  102. (station, next2) => {
  103. if (
  104. station.paused ||
  105. !station.currentSong ||
  106. !station.currentSong.title
  107. )
  108. return next2();
  109. const timeElapsed =
  110. Date.now() -
  111. station.startedAt -
  112. station.timePaused;
  113. if (timeElapsed <= station.currentSong.duration)
  114. return next2();
  115. else {
  116. this.log(
  117. "ERROR",
  118. "TASK_STATIONS_SKIP_CHECK",
  119. `Skipping ${station._id} as it should have skipped already.`
  120. );
  121. this.stations
  122. .runJob("INITIALIZE_STATION", {
  123. stationId: station._id,
  124. })
  125. .then(() => {
  126. next2();
  127. });
  128. }
  129. },
  130. () => {
  131. next();
  132. }
  133. );
  134. },
  135. ],
  136. () => {
  137. resolve();
  138. }
  139. );
  140. });
  141. }
  142. sessionClearingTask() {
  143. return new Promise((resolve, reject) => {
  144. this.log(
  145. "INFO",
  146. "TASK_SESSION_CLEAR",
  147. `Checking for sessions to be cleared.`
  148. );
  149. async.waterfall(
  150. [
  151. (next) => {
  152. this.cache
  153. .runJob("HGETALL", {
  154. table: "sessions",
  155. })
  156. .then((sessions) => {
  157. next(null, sessions);
  158. })
  159. .catch(next);
  160. },
  161. (sessions, next) => {
  162. if (!sessions) return next();
  163. let keys = Object.keys(sessions);
  164. async.each(
  165. keys,
  166. (sessionId, next2) => {
  167. let session = sessions[sessionId];
  168. if (
  169. session &&
  170. session.refreshDate &&
  171. Date.now() - session.refreshDate <
  172. 60 * 60 * 24 * 30 * 1000
  173. )
  174. return next2();
  175. if (!session) {
  176. this.log(
  177. "INFO",
  178. "TASK_SESSION_CLEAR",
  179. "Removing an empty session."
  180. );
  181. this.cache
  182. .runJob("HDEL", {
  183. table: "sessions",
  184. key: sessionId,
  185. })
  186. .finally(() => next2());
  187. } else if (!session.refreshDate) {
  188. session.refreshDate = Date.now();
  189. this.cache
  190. .runJob("HSET", {
  191. table: "sessions",
  192. key: sessionId,
  193. value: session,
  194. })
  195. .finally(() => next2());
  196. } else if (
  197. Date.now() - session.refreshDate >
  198. 60 * 60 * 24 * 30 * 1000
  199. ) {
  200. this.utils
  201. .runJob("SOCKETS_FROM_SESSION_ID", {
  202. sessionId: session.sessionId,
  203. })
  204. .then((response) => {
  205. if (response.sockets.length > 0) {
  206. session.refreshDate = Date.now();
  207. this.cache
  208. .runJob("HSET", {
  209. table: "sessions",
  210. key: sessionId,
  211. value: session,
  212. })
  213. .finally(() => next2());
  214. } else {
  215. this.log(
  216. "INFO",
  217. "TASK_SESSION_CLEAR",
  218. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  219. );
  220. this.cache
  221. .runJob("HDEL", {
  222. table: "sessions",
  223. key: session.sessionId,
  224. })
  225. .finally(() => next2());
  226. }
  227. });
  228. } else {
  229. this.log(
  230. "ERROR",
  231. "TASK_SESSION_CLEAR",
  232. "This should never log."
  233. );
  234. next2();
  235. }
  236. },
  237. () => {
  238. next();
  239. }
  240. );
  241. },
  242. ],
  243. () => {
  244. resolve();
  245. }
  246. );
  247. });
  248. }
  249. logFileSizeCheckTask() {
  250. return new Promise((resolve, reject) => {
  251. this.log(
  252. "INFO",
  253. "TASK_LOG_FILE_SIZE_CHECK",
  254. `Checking the size for the log files.`
  255. );
  256. async.each(
  257. [
  258. "all.log",
  259. "debugStation.log",
  260. "error.log",
  261. "info.log",
  262. "success.log",
  263. ],
  264. (fileName, next) => {
  265. const stats = fs.statSync(
  266. `${__dirname}/../../log/${fileName}`
  267. );
  268. const mb = stats.size / 1000000;
  269. if (mb > 25) return next(true);
  270. else next();
  271. },
  272. (err) => {
  273. if (err === true) {
  274. this.log(
  275. "ERROR",
  276. "LOGGER_FILE_SIZE_WARNING",
  277. "************************************WARNING*************************************"
  278. );
  279. this.log(
  280. "ERROR",
  281. "LOGGER_FILE_SIZE_WARNING",
  282. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  283. );
  284. this.log(
  285. "ERROR",
  286. "LOGGER_FILE_SIZE_WARNING",
  287. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  288. );
  289. this.log(
  290. "ERROR",
  291. "LOGGER_FILE_SIZE_WARNING",
  292. "********************************************************************************"
  293. );
  294. }
  295. resolve();
  296. }
  297. );
  298. });
  299. }
  300. }
  301. module.exports = new TasksModule();