tasks.js 7.1 KB


  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. class TasksModule extends CoreClass {
  9. constructor() {
  10. super("tasks");
  11. this.tasks = {};
  12. }
  13. initialize() {
  14. return new Promise(resolve => {
  15. // return reject(new Error("Not fully migrated yet."));
  16. this.cache = this.moduleManager.modules.cache;
  17. this.stations = this.moduleManager.modules.stations;
  18. this.notifications = this.moduleManager.modules.notifications;
  19. this.utils = this.moduleManager.modules.utils;
  20. // this.createTask("testTask", testTask, 5000, true);
  21. this.runJob("CREATE_TASK", {
  22. name: "stationSkipTask",
  23. fn: this.checkStationSkipTask,
  24. timeout: 1000 * 60 * 30
  25. });
  26. this.runJob("CREATE_TASK", {
  27. name: "sessionClearTask",
  28. fn: this.sessionClearingTask,
  29. timeout: 1000 * 60 * 60 * 6
  30. });
  31. this.runJob("CREATE_TASK", {
  32. name: "logFileSizeCheckTask",
  33. fn: this.logFileSizeCheckTask,
  34. timeout: 1000 * 60 * 60
  35. });
  36. resolve();
  37. });
  38. }
  39. CREATE_TASK(payload) {
  40. return new Promise((resolve, reject) => {
  41. this.tasks[payload.name] = {
  42. name: payload.name,
  43. fn: payload.fn,
  44. timeout: payload.timeout,
  45. lastRan: 0,
  46. timer: null
  47. };
  48. if (!payload.paused) {
  49. this.runJob("RUN_TASK", { name: payload.name })
  50. .then(() => resolve())
  51. .catch(err => reject(err));
  52. } else resolve();
  53. });
  54. }
  55. PAUSE_TASK(payload) {
  56. const taskName = { payload };
  57. return new Promise(resolve => {
  58. if (this.tasks[taskName].timer) this.tasks[taskName].timer.pause();
  59. resolve();
  60. });
  61. }
  62. RESUME_TASK(payload) {
  63. return new Promise(resolve => {
  64. this.tasks[payload.name].timer.resume();
  65. resolve();
  66. });
  67. }
  68. RUN_TASK(payload) {
  69. return new Promise(resolve => {
  70. const task = this.tasks[payload.name];
  71. if (task.timer) task.timer.pause();
  72. task.fn.apply(this).then(() => {
  73. task.lastRan = Date.now();
  74. task.timer = new Timer(() => this.runJob("RUN_TASK", { name: payload.name }), task.timeout, false);
  75. resolve();
  76. });
  77. });
  78. }
  79. checkStationSkipTask() {
  80. return new Promise(resolve => {
  81. this.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  82. async.waterfall(
  83. [
  84. next => {
  85. this.cache
  86. .runJob("HGETALL", { table: "stations" })
  87. .then(response => next(null, response))
  88. .catch(next);
  89. },
  90. (stations, next) => {
  91. async.each(
  92. stations,
  93. (station, next2) => {
  94. if (station.paused || !station.currentSong || !station.currentSong.title)
  95. return next2();
  96. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  97. if (timeElapsed <= station.currentSong.duration) return next2();
  98. this.log(
  99. "ERROR",
  100. "TASK_STATIONS_SKIP_CHECK",
  101. `Skipping ${station._id} as it should have skipped already.`
  102. );
  103. return this.stations
  104. .runJob("INITIALIZE_STATION", {
  105. stationId: station._id
  106. })
  107. .then(() => next2());
  108. },
  109. () => next()
  110. );
  111. }
  112. ],
  113. () => resolve()
  114. );
  115. });
  116. }
  117. sessionClearingTask() {
  118. return new Promise(resolve => {
  119. this.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  120. async.waterfall(
  121. [
  122. next => {
  123. this.cache
  124. .runJob("HGETALL", { table: "sessions" })
  125. .then(sessions => next(null, sessions))
  126. .catch(next);
  127. },
  128. (sessions, next) => {
  129. if (!sessions) return next();
  130. const keys = Object.keys(sessions);
  131. return async.each(
  132. keys,
  133. (sessionId, next2) => {
  134. const session = sessions[sessionId];
  135. if (
  136. session &&
  137. session.refreshDate &&
  138. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  139. )
  140. return next2();
  141. if (!session) {
  142. this.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  143. return this.cache
  144. .runJob("HDEL", {
  145. table: "sessions",
  146. key: sessionId
  147. })
  148. .finally(() => {
  149. next2();
  150. });
  151. }
  152. if (!session.refreshDate) {
  153. session.refreshDate = Date.now();
  154. return this.cache
  155. .runJob("HSET", {
  156. table: "sessions",
  157. key: sessionId,
  158. value: session
  159. })
  160. .finally(() => next2());
  161. }
  162. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  163. return this.utils
  164. .runJob("SOCKETS_FROM_SESSION_ID", {
  165. sessionId: session.sessionId
  166. })
  167. .then(response => {
  168. if (response.sockets.length > 0) {
  169. session.refreshDate = Date.now();
  170. this.cache
  171. .runJob("HSET", {
  172. table: "sessions",
  173. key: sessionId,
  174. value: session
  175. })
  176. .finally(() => {
  177. next2();
  178. });
  179. } else {
  180. this.log(
  181. "INFO",
  182. "TASK_SESSION_CLEAR",
  183. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  184. );
  185. this.cache
  186. .runJob("HDEL", {
  187. table: "sessions",
  188. key: session.sessionId
  189. })
  190. .finally(() => next2());
  191. }
  192. });
  193. }
  194. this.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  195. return next2();
  196. },
  197. () => next()
  198. );
  199. }
  200. ],
  201. () => resolve()
  202. );
  203. });
  204. }
  205. logFileSizeCheckTask() {
  206. return new Promise((resolve, reject) => {
  207. this.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  208. async.each(
  209. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  210. (fileName, next) => {
  211. try {
  212. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  213. const mb = stats.size / 1000000;
  214. if (mb > 25) return next(true);
  215. return next();
  216. } catch (err) {
  217. return next(err);
  218. }
  219. },
  220. async err => {
  221. if (err && err !== true) {
  222. err = await this.utils.runJob("GET_ERROR", { error: err });
  223. return reject(new Error(err));
  224. }
  225. if (err === true) {
  226. this.log(
  227. "ERROR",
  228. "LOGGER_FILE_SIZE_WARNING",
  229. "************************************WARNING*************************************"
  230. );
  231. this.log(
  232. "ERROR",
  233. "LOGGER_FILE_SIZE_WARNING",
  234. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  235. );
  236. this.log(
  237. "ERROR",
  238. "LOGGER_FILE_SIZE_WARNING",
  239. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  240. );
  241. this.log(
  242. "ERROR",
  243. "LOGGER_FILE_SIZE_WARNING",
  244. "********************************************************************************"
  245. );
  246. }
  247. return resolve();
  248. }
  249. );
  250. });
  251. }
  252. }
  253. export default new TasksModule();