tasks.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. const CoreClass = require("../core.js");
  2. const tasks = {};
  3. const async = require("async");
  4. const fs = require("fs");
  5. const Timer = require("../classes/Timer.class");
  6. class TasksModule extends CoreClass {
  7. constructor() {
  8. super("tasks");
  9. }
  10. initialize() {
  11. return new Promise((resolve, reject) => {
  12. // return reject(new Error("Not fully migrated yet."));
  13. this.cache = this.moduleManager.modules["cache"];
  14. this.stations = this.moduleManager.modules["stations"];
  15. this.notifications = this.moduleManager.modules["notifications"];
  16. this.utils = this.moduleManager.modules["utils"];
  17. //this.createTask("testTask", testTask, 5000, true);
  18. this.runJob("CREATE_TASK", {
  19. name: "stationSkipTask",
  20. fn: this.checkStationSkipTask,
  21. timeout: 1000 * 60 * 30,
  22. });
  23. this.runJob("CREATE_TASK", {
  24. name: "sessionClearTask",
  25. fn: this.sessionClearingTask,
  26. timeout: 1000 * 60 * 60 * 6,
  27. });
  28. this.runJob("CREATE_TASK", {
  29. name: "logFileSizeCheckTask",
  30. fn: this.logFileSizeCheckTask,
  31. timeout: 1000 * 60 * 60,
  32. });
  33. resolve();
  34. });
  35. }
  36. CREATE_TASK(payload) {
  37. return new Promise((resolve, reject) => {
  38. tasks[payload.name] = {
  39. name: payload.name,
  40. fn: payload.fn,
  41. timeout: payload.timeout,
  42. lastRan: 0,
  43. timer: null,
  44. };
  45. if (!payload.paused) {
  46. this.runJob("RUN_TASK", { name: payload.name })
  47. .then(() => resolve())
  48. .catch((err) => reject(err));
  49. } else resolve();
  50. });
  51. }
  52. PAUSE_TASK(payload) {
  53. return new Promise((resolve, reject) => {
  54. if (tasks[payload.name].timer) tasks[name].timer.pause();
  55. resolve();
  56. });
  57. }
  58. RESUME_TASK(payload) {
  59. return new Promise((resolve, reject) => {
  60. tasks[payload.name].timer.resume();
  61. resolve();
  62. });
  63. }
  64. RUN_TASK(payload) {
  65. return new Promise((resolve, reject) => {
  66. const task = tasks[payload.name];
  67. if (task.timer) task.timer.pause();
  68. task.fn.apply(this).then(() => {
  69. task.lastRan = Date.now();
  70. task.timer = new Timer(
  71. () => {
  72. this.runJob("RUN_TASK", { name: payload.name });
  73. },
  74. task.timeout,
  75. false
  76. );
  77. resolve();
  78. });
  79. });
  80. }
  81. checkStationSkipTask(callback) {
  82. return new Promise((resolve, reject) => {
  83. this.log(
  84. "INFO",
  85. "TASK_STATIONS_SKIP_CHECK",
  86. `Checking for stations to be skipped.`,
  87. false
  88. );
  89. async.waterfall(
  90. [
  91. (next) => {
  92. this.cache
  93. .runJob("HGETALL", {
  94. table: "stations",
  95. })
  96. .then((response) => {
  97. next(null, response);
  98. })
  99. .catch(next);
  100. },
  101. (stations, next) => {
  102. async.each(
  103. stations,
  104. (station, next2) => {
  105. if (
  106. station.paused ||
  107. !station.currentSong ||
  108. !station.currentSong.title
  109. )
  110. return next2();
  111. const timeElapsed =
  112. Date.now() -
  113. station.startedAt -
  114. station.timePaused;
  115. if (timeElapsed <= station.currentSong.duration)
  116. return next2();
  117. else {
  118. this.log(
  119. "ERROR",
  120. "TASK_STATIONS_SKIP_CHECK",
  121. `Skipping ${station._id} as it should have skipped already.`
  122. );
  123. this.stations
  124. .runJob("INITIALIZE_STATION", {
  125. stationId: station._id,
  126. })
  127. .then(() => {
  128. next2();
  129. });
  130. }
  131. },
  132. () => {
  133. next();
  134. }
  135. );
  136. },
  137. ],
  138. () => {
  139. resolve();
  140. }
  141. );
  142. });
  143. }
  144. sessionClearingTask() {
  145. return new Promise((resolve, reject) => {
  146. this.log(
  147. "INFO",
  148. "TASK_SESSION_CLEAR",
  149. `Checking for sessions to be cleared.`
  150. );
  151. async.waterfall(
  152. [
  153. (next) => {
  154. this.cache
  155. .runJob("HGETALL", {
  156. table: "sessions",
  157. })
  158. .then((sessions) => {
  159. next(null, sessions);
  160. })
  161. .catch(next);
  162. },
  163. (sessions, next) => {
  164. if (!sessions) return next();
  165. let keys = Object.keys(sessions);
  166. async.each(
  167. keys,
  168. (sessionId, next2) => {
  169. let session = sessions[sessionId];
  170. if (
  171. session &&
  172. session.refreshDate &&
  173. Date.now() - session.refreshDate <
  174. 60 * 60 * 24 * 30 * 1000
  175. )
  176. return next2();
  177. if (!session) {
  178. this.log(
  179. "INFO",
  180. "TASK_SESSION_CLEAR",
  181. "Removing an empty session."
  182. );
  183. this.cache
  184. .runJob("HDEL", {
  185. table: "sessions",
  186. key: sessionId,
  187. })
  188. .finally(() => {
  189. next2();
  190. });
  191. } else if (!session.refreshDate) {
  192. session.refreshDate = Date.now();
  193. this.cache
  194. .runJob("HSET", {
  195. table: "sessions",
  196. key: sessionId,
  197. value: session,
  198. })
  199. .finally(() => {
  200. next2()
  201. });
  202. } else if (
  203. Date.now() - session.refreshDate >
  204. 60 * 60 * 24 * 30 * 1000
  205. ) {
  206. this.utils
  207. .runJob("SOCKETS_FROM_SESSION_ID", {
  208. sessionId: session.sessionId,
  209. })
  210. .then((response) => {
  211. if (response.sockets.length > 0) {
  212. session.refreshDate = Date.now();
  213. this.cache
  214. .runJob("HSET", {
  215. table: "sessions",
  216. key: sessionId,
  217. value: session,
  218. })
  219. .finally(() => {
  220. next2();
  221. });
  222. } else {
  223. this.log(
  224. "INFO",
  225. "TASK_SESSION_CLEAR",
  226. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  227. );
  228. this.cache
  229. .runJob("HDEL", {
  230. table: "sessions",
  231. key: session.sessionId,
  232. })
  233. .finally(() => {
  234. next2();
  235. });
  236. }
  237. });
  238. } else {
  239. this.log(
  240. "ERROR",
  241. "TASK_SESSION_CLEAR",
  242. "This should never log."
  243. );
  244. next2();
  245. }
  246. },
  247. () => {
  248. next();
  249. }
  250. );
  251. },
  252. ],
  253. () => {
  254. resolve();
  255. }
  256. );
  257. });
  258. }
  259. logFileSizeCheckTask() {
  260. return new Promise((resolve, reject) => {
  261. this.log(
  262. "INFO",
  263. "TASK_LOG_FILE_SIZE_CHECK",
  264. `Checking the size for the log files.`
  265. );
  266. async.each(
  267. [
  268. "all.log",
  269. "debugStation.log",
  270. "error.log",
  271. "info.log",
  272. "success.log",
  273. ],
  274. (fileName, next) => {
  275. try {
  276. const stats = fs.statSync(
  277. `${__dirname}/../../log/${fileName}`
  278. );
  279. const mb = stats.size / 1000000;
  280. if (mb > 25) return next(true);
  281. else next();
  282. } catch(err) {
  283. next(err);
  284. }
  285. },
  286. async (err) => {
  287. if (err && err !== true) {
  288. err = await this.utils.runJob("GET_ERROR", { error: err });
  289. return reject(new Error(err));
  290. } else if (err === true) {
  291. this.log(
  292. "ERROR",
  293. "LOGGER_FILE_SIZE_WARNING",
  294. "************************************WARNING*************************************"
  295. );
  296. this.log(
  297. "ERROR",
  298. "LOGGER_FILE_SIZE_WARNING",
  299. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  300. );
  301. this.log(
  302. "ERROR",
  303. "LOGGER_FILE_SIZE_WARNING",
  304. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  305. );
  306. this.log(
  307. "ERROR",
  308. "LOGGER_FILE_SIZE_WARNING",
  309. "********************************************************************************"
  310. );
  311. }
  312. resolve();
  313. }
  314. );
  315. });
  316. }
  317. }
  318. module.exports = new TasksModule();