tasks.js 8.8 KB


  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. let TasksModule;
  9. let CacheModule;
  10. let StationsModule;
  11. let UtilsModule;
  12. let IOModule;
  13. class _TasksModule extends CoreClass {
  14. // eslint-disable-next-line require-jsdoc
  15. constructor() {
  16. super("tasks");
  17. this.tasks = {};
  18. TasksModule = this;
  19. }
  20. /**
  21. * Initialises the tasks module
  22. *
  23. * @returns {Promise} - returns promise (reject, resolve)
  24. */
  25. initialize() {
  26. return new Promise(resolve => {
  27. // return reject(new Error("Not fully migrated yet."));
  28. CacheModule = this.moduleManager.modules.cache;
  29. StationsModule = this.moduleManager.modules.stations;
  30. UtilsModule = this.moduleManager.modules.utils;
  31. IOModule = this.moduleManager.modules.io;
  32. // this.createTask("testTask", testTask, 5000, true);
  33. TasksModule.runJob("CREATE_TASK", {
  34. name: "stationSkipTask",
  35. fn: TasksModule.checkStationSkipTask,
  36. timeout: 1000 * 60 * 30
  37. });
  38. TasksModule.runJob("CREATE_TASK", {
  39. name: "sessionClearTask",
  40. fn: TasksModule.sessionClearingTask,
  41. timeout: 1000 * 60 * 60 * 6
  42. });
  43. TasksModule.runJob("CREATE_TASK", {
  44. name: "logFileSizeCheckTask",
  45. fn: TasksModule.logFileSizeCheckTask,
  46. timeout: 1000 * 60 * 60
  47. });
  48. resolve();
  49. });
  50. }
  51. /**
  52. * Creates a new task
  53. *
  54. * @param {object} payload - object that contains the payload
  55. * @param {string} payload.name - the name of the task
  56. * @param {string} payload.fn - the function the task will run
  57. * @param {string} payload.paused - if the task is currently paused
  58. * @param {boolean} payload.timeout - how often to run the task
  59. * @returns {Promise} - returns promise (reject, resolve)
  60. */
  61. CREATE_TASK(payload) {
  62. return new Promise((resolve, reject) => {
  63. TasksModule.tasks[payload.name] = {
  64. name: payload.name,
  65. fn: payload.fn,
  66. timeout: payload.timeout,
  67. lastRan: 0,
  68. timer: null
  69. };
  70. if (!payload.paused) {
  71. TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
  72. .then(() => resolve())
  73. .catch(err => reject(err));
  74. } else resolve();
  75. });
  76. }
  77. /**
  78. * Pauses a task
  79. *
  80. * @param {object} payload - object that contains the payload
  81. * @param {string} payload.taskName - the name of the task to pause
  82. * @returns {Promise} - returns promise (reject, resolve)
  83. */
  84. PAUSE_TASK(payload) {
  85. const taskName = { payload };
  86. return new Promise(resolve => {
  87. if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
  88. resolve();
  89. });
  90. }
  91. /**
  92. * Resumes a task
  93. *
  94. * @param {object} payload - object that contains the payload
  95. * @param {string} payload.name - the name of the task to resume
  96. * @returns {Promise} - returns promise (reject, resolve)
  97. */
  98. RESUME_TASK(payload) {
  99. return new Promise(resolve => {
  100. TasksModule.tasks[payload.name].timer.resume();
  101. resolve();
  102. });
  103. }
  104. /**
  105. * Runs a task's function and restarts the timer
  106. *
  107. * @param {object} payload - object that contains the payload
  108. * @param {string} payload.name - the name of the task to run
  109. * @returns {Promise} - returns promise (reject, resolve)
  110. */
  111. RUN_TASK(payload) {
  112. return new Promise(resolve => {
  113. const task = TasksModule.tasks[payload.name];
  114. if (task.timer) task.timer.pause();
  115. task.fn.apply(null).then(() => {
  116. task.lastRan = Date.now();
  117. task.timer = new Timer(
  118. () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
  119. task.timeout,
  120. false
  121. );
  122. resolve();
  123. });
  124. });
  125. }
  126. /**
  127. * Periodically checks if any stations need to be skipped
  128. *
  129. * @returns {Promise} - returns promise (reject, resolve)
  130. */
  131. checkStationSkipTask() {
  132. return new Promise(resolve => {
  133. TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  134. async.waterfall(
  135. [
  136. next => {
  137. CacheModule.runJob("HGETALL", { table: "stations" })
  138. .then(response => next(null, response))
  139. .catch(next);
  140. },
  141. (stations, next) => {
  142. async.each(
  143. stations,
  144. (station, next2) => {
  145. if (station.paused || !station.currentSong || !station.currentSong.title)
  146. return next2();
  147. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  148. if (timeElapsed <= station.currentSong.duration) return next2();
  149. TasksModule.log(
  150. "ERROR",
  151. "TASK_STATIONS_SKIP_CHECK",
  152. `Skipping ${station._id} as it should have skipped already.`
  153. );
  154. return StationsModule.runJob("INITIALIZE_STATION", {
  155. stationId: station._id
  156. }).then(() => next2());
  157. },
  158. () => next()
  159. );
  160. }
  161. ],
  162. () => resolve()
  163. );
  164. });
  165. }
  166. /**
  167. * Periodically checks if any sessions are out of date and need to be cleared
  168. *
  169. * @returns {Promise} - returns promise (reject, resolve)
  170. */
  171. sessionClearingTask() {
  172. return new Promise(resolve => {
  173. TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  174. async.waterfall(
  175. [
  176. next => {
  177. CacheModule.runJob("HGETALL", { table: "sessions" })
  178. .then(sessions => next(null, sessions))
  179. .catch(next);
  180. },
  181. (sessions, next) => {
  182. if (!sessions) return next();
  183. const keys = Object.keys(sessions);
  184. return async.each(
  185. keys,
  186. (sessionId, next2) => {
  187. const session = sessions[sessionId];
  188. if (
  189. session &&
  190. session.refreshDate &&
  191. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  192. )
  193. return next2();
  194. if (!session) {
  195. TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  196. return CacheModule.runJob("HDEL", {
  197. table: "sessions",
  198. key: sessionId
  199. }).finally(() => {
  200. next2();
  201. });
  202. }
  203. if (!session.refreshDate) {
  204. session.refreshDate = Date.now();
  205. return CacheModule.runJob("HSET", {
  206. table: "sessions",
  207. key: sessionId,
  208. value: session
  209. }).finally(() => next2());
  210. }
  211. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  212. return IOModule.runJob("SOCKETS_FROM_SESSION_ID", {
  213. sessionId: session.sessionId
  214. }).then(response => {
  215. if (response.sockets.length > 0) {
  216. session.refreshDate = Date.now();
  217. CacheModule.runJob("HSET", {
  218. table: "sessions",
  219. key: sessionId,
  220. value: session
  221. }).finally(() => {
  222. next2();
  223. });
  224. } else {
  225. TasksModule.log(
  226. "INFO",
  227. "TASK_SESSION_CLEAR",
  228. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  229. );
  230. CacheModule.runJob("HDEL", {
  231. table: "sessions",
  232. key: session.sessionId
  233. }).finally(() => next2());
  234. }
  235. });
  236. }
  237. TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  238. return next2();
  239. },
  240. () => next()
  241. );
  242. }
  243. ],
  244. () => resolve()
  245. );
  246. });
  247. }
  248. /**
  249. * Periodically warns about the size of any log files
  250. *
  251. * @returns {Promise} - returns promise (reject, resolve)
  252. */
  253. logFileSizeCheckTask() {
  254. return new Promise((resolve, reject) => {
  255. TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  256. async.each(
  257. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  258. (fileName, next) => {
  259. try {
  260. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  261. const mb = stats.size / 1000000;
  262. if (mb > 25) return next(true);
  263. return next();
  264. } catch (err) {
  265. return next(err);
  266. }
  267. },
  268. async err => {
  269. if (err && err !== true) {
  270. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  271. return reject(new Error(err));
  272. }
  273. if (err === true) {
  274. TasksModule.log(
  275. "ERROR",
  276. "LOGGER_FILE_SIZE_WARNING",
  277. "************************************WARNING*************************************"
  278. );
  279. TasksModule.log(
  280. "ERROR",
  281. "LOGGER_FILE_SIZE_WARNING",
  282. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  283. );
  284. TasksModule.log(
  285. "ERROR",
  286. "LOGGER_FILE_SIZE_WARNING",
  287. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  288. );
  289. TasksModule.log(
  290. "ERROR",
  291. "LOGGER_FILE_SIZE_WARNING",
  292. "********************************************************************************"
  293. );
  294. }
  295. return resolve();
  296. }
  297. );
  298. });
  299. }
  300. }
  301. export default new _TasksModule();