tasks.js 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. let TasksModule;
  9. let CacheModule;
  10. let StationsModule;
  11. let UtilsModule;
  12. class _TasksModule extends CoreClass {
  13. // eslint-disable-next-line require-jsdoc
  14. constructor() {
  15. super("tasks");
  16. this.tasks = {};
  17. TasksModule = this;
  18. }
  19. /**
  20. * Initialises the tasks module
  21. *
  22. * @returns {Promise} - returns promise (reject, resolve)
  23. */
  24. initialize() {
  25. return new Promise(resolve => {
  26. // return reject(new Error("Not fully migrated yet."));
  27. CacheModule = this.moduleManager.modules.cache;
  28. StationsModule = this.moduleManager.modules.stations;
  29. UtilsModule = this.moduleManager.modules.utils;
  30. // this.createTask("testTask", testTask, 5000, true);
  31. TasksModule.runJob("CREATE_TASK", {
  32. name: "stationSkipTask",
  33. fn: TasksModule.checkStationSkipTask,
  34. timeout: 1000 * 60 * 30
  35. });
  36. TasksModule.runJob("CREATE_TASK", {
  37. name: "sessionClearTask",
  38. fn: TasksModule.sessionClearingTask,
  39. timeout: 1000 * 60 * 60 * 6
  40. });
  41. TasksModule.runJob("CREATE_TASK", {
  42. name: "logFileSizeCheckTask",
  43. fn: TasksModule.logFileSizeCheckTask,
  44. timeout: 1000 * 60 * 60
  45. });
  46. resolve();
  47. });
  48. }
  49. /**
  50. * Creates a new task
  51. *
  52. * @param {object} payload - object that contains the payload
  53. * @param {string} payload.name - the name of the task
  54. * @param {string} payload.fn - the function the task will run
  55. * @param {string} payload.paused - if the task is currently paused
  56. * @param {boolean} payload.timeout - how often to run the task
  57. * @returns {Promise} - returns promise (reject, resolve)
  58. */
  59. CREATE_TASK(payload) {
  60. return new Promise((resolve, reject) => {
  61. TasksModule.tasks[payload.name] = {
  62. name: payload.name,
  63. fn: payload.fn,
  64. timeout: payload.timeout,
  65. lastRan: 0,
  66. timer: null
  67. };
  68. if (!payload.paused) {
  69. TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
  70. .then(() => resolve())
  71. .catch(err => reject(err));
  72. } else resolve();
  73. });
  74. }
  75. /**
  76. * Pauses a task
  77. *
  78. * @param {object} payload - object that contains the payload
  79. * @param {string} payload.taskName - the name of the task to pause
  80. * @returns {Promise} - returns promise (reject, resolve)
  81. */
  82. PAUSE_TASK(payload) {
  83. const taskName = { payload };
  84. return new Promise(resolve => {
  85. if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
  86. resolve();
  87. });
  88. }
  89. /**
  90. * Resumes a task
  91. *
  92. * @param {object} payload - object that contains the payload
  93. * @param {string} payload.name - the name of the task to resume
  94. * @returns {Promise} - returns promise (reject, resolve)
  95. */
  96. RESUME_TASK(payload) {
  97. return new Promise(resolve => {
  98. TasksModule.tasks[payload.name].timer.resume();
  99. resolve();
  100. });
  101. }
  102. /**
  103. * Runs a task's function and restarts the timer
  104. *
  105. * @param {object} payload - object that contains the payload
  106. * @param {string} payload.name - the name of the task to run
  107. * @returns {Promise} - returns promise (reject, resolve)
  108. */
  109. RUN_TASK(payload) {
  110. return new Promise(resolve => {
  111. const task = TasksModule.tasks[payload.name];
  112. if (task.timer) task.timer.pause();
  113. TasksModule.log("ERROR", "CHECK THIS?!?!?!??!?!?!?!?!??!?!");
  114. task.fn.apply(this).then(() => {
  115. task.lastRan = Date.now();
  116. task.timer = new Timer(
  117. () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
  118. task.timeout,
  119. false
  120. );
  121. resolve();
  122. });
  123. });
  124. }
  125. /**
  126. * Periodically checks if any stations need to be skipped
  127. *
  128. * @returns {Promise} - returns promise (reject, resolve)
  129. */
  130. checkStationSkipTask() {
  131. return new Promise(resolve => {
  132. TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  133. async.waterfall(
  134. [
  135. next => {
  136. CacheModule.runJob("HGETALL", { table: "stations" }, this)
  137. .then(response => next(null, response))
  138. .catch(next);
  139. },
  140. (stations, next) => {
  141. async.each(
  142. stations,
  143. (station, next2) => {
  144. if (station.paused || !station.currentSong || !station.currentSong.title)
  145. return next2();
  146. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  147. if (timeElapsed <= station.currentSong.duration) return next2();
  148. TasksModule.log(
  149. "ERROR",
  150. "TASK_STATIONS_SKIP_CHECK",
  151. `Skipping ${station._id} as it should have skipped already.`
  152. );
  153. return StationsModule.runJob(
  154. "INITIALIZE_STATION",
  155. {
  156. stationId: station._id
  157. },
  158. this
  159. ).then(() => next2());
  160. },
  161. () => next()
  162. );
  163. }
  164. ],
  165. () => resolve()
  166. );
  167. });
  168. }
  169. /**
  170. * Periodically checks if any sessions are out of date and need to be cleared
  171. *
  172. * @returns {Promise} - returns promise (reject, resolve)
  173. */
  174. sessionClearingTask() {
  175. return new Promise(resolve => {
  176. TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  177. async.waterfall(
  178. [
  179. next => {
  180. CacheModule.runJob("HGETALL", { table: "sessions" }, this)
  181. .then(sessions => next(null, sessions))
  182. .catch(next);
  183. },
  184. (sessions, next) => {
  185. if (!sessions) return next();
  186. const keys = Object.keys(sessions);
  187. return async.each(
  188. keys,
  189. (sessionId, next2) => {
  190. const session = sessions[sessionId];
  191. if (
  192. session &&
  193. session.refreshDate &&
  194. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  195. )
  196. return next2();
  197. if (!session) {
  198. TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  199. return CacheModule.runJob(
  200. "HDEL",
  201. {
  202. table: "sessions",
  203. key: sessionId
  204. },
  205. this
  206. ).finally(() => {
  207. next2();
  208. });
  209. }
  210. if (!session.refreshDate) {
  211. session.refreshDate = Date.now();
  212. return CacheModule.runJob("HSET", {
  213. table: "sessions",
  214. key: sessionId,
  215. value: session
  216. }).finally(() => next2());
  217. }
  218. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  219. return UtilsModule.runJob(
  220. "SOCKETS_FROM_SESSION_ID",
  221. {
  222. sessionId: session.sessionId
  223. },
  224. this
  225. ).then(response => {
  226. if (response.sockets.length > 0) {
  227. session.refreshDate = Date.now();
  228. CacheModule.runJob(
  229. "HSET",
  230. {
  231. table: "sessions",
  232. key: sessionId,
  233. value: session
  234. },
  235. this
  236. ).finally(() => {
  237. next2();
  238. });
  239. } else {
  240. TasksModule.log(
  241. "INFO",
  242. "TASK_SESSION_CLEAR",
  243. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  244. );
  245. CacheModule.runJob(
  246. "HDEL",
  247. {
  248. table: "sessions",
  249. key: session.sessionId
  250. },
  251. this
  252. ).finally(() => next2());
  253. }
  254. });
  255. }
  256. TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  257. return next2();
  258. },
  259. () => next()
  260. );
  261. }
  262. ],
  263. () => resolve()
  264. );
  265. });
  266. }
  267. /**
  268. * Periodically warns about the size of any log files
  269. *
  270. * @returns {Promise} - returns promise (reject, resolve)
  271. */
  272. logFileSizeCheckTask() {
  273. return new Promise((resolve, reject) => {
  274. TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  275. async.each(
  276. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  277. (fileName, next) => {
  278. try {
  279. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  280. const mb = stats.size / 1000000;
  281. if (mb > 25) return next(true);
  282. return next();
  283. } catch (err) {
  284. return next(err);
  285. }
  286. },
  287. async err => {
  288. if (err && err !== true) {
  289. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  290. return reject(new Error(err));
  291. }
  292. if (err === true) {
  293. TasksModule.log(
  294. "ERROR",
  295. "LOGGER_FILE_SIZE_WARNING",
  296. "************************************WARNING*************************************"
  297. );
  298. TasksModule.log(
  299. "ERROR",
  300. "LOGGER_FILE_SIZE_WARNING",
  301. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  302. );
  303. TasksModule.log(
  304. "ERROR",
  305. "LOGGER_FILE_SIZE_WARNING",
  306. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  307. );
  308. TasksModule.log(
  309. "ERROR",
  310. "LOGGER_FILE_SIZE_WARNING",
  311. "********************************************************************************"
  312. );
  313. }
  314. return resolve();
  315. }
  316. );
  317. });
  318. }
  319. }
  320. export default new _TasksModule();