tasks.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. let TasksModule;
  9. let CacheModule;
  10. let StationsModule;
  11. let UtilsModule;
  12. let WSModule;
  13. let DBModule;
  14. class _TasksModule extends CoreClass {
  15. // eslint-disable-next-line require-jsdoc
  16. constructor() {
  17. super("tasks");
  18. this.tasks = {};
  19. TasksModule = this;
  20. }
  21. /**
  22. * Initialises the tasks module
  23. *
  24. * @returns {Promise} - returns promise (reject, resolve)
  25. */
  26. initialize() {
  27. return new Promise(resolve => {
  28. // return reject(new Error("Not fully migrated yet."));
  29. CacheModule = this.moduleManager.modules.cache;
  30. StationsModule = this.moduleManager.modules.stations;
  31. UtilsModule = this.moduleManager.modules.utils;
  32. WSModule = this.moduleManager.modules.ws;
  33. DBModule = this.moduleManager.modules.db;
  34. // this.createTask("testTask", testTask, 5000, true);
  35. TasksModule.runJob("CREATE_TASK", {
  36. name: "stationSkipTask",
  37. fn: TasksModule.checkStationSkipTask,
  38. timeout: 1000 * 60 * 30
  39. });
  40. TasksModule.runJob("CREATE_TASK", {
  41. name: "sessionClearTask",
  42. fn: TasksModule.sessionClearingTask,
  43. timeout: 1000 * 60 * 60 * 6
  44. });
  45. // TasksModule.runJob("CREATE_TASK", {
  46. // name: "logFileSizeCheckTask",
  47. // fn: TasksModule.logFileSizeCheckTask,
  48. // timeout: 1000 * 60 * 60
  49. // });
  50. TasksModule.runJob("CREATE_TASK", {
  51. name: "collectStationUsersTask",
  52. fn: TasksModule.collectStationUsersTask,
  53. timeout: 1000 * 3
  54. });
  55. resolve();
  56. });
  57. }
  58. /**
  59. * Creates a new task
  60. *
  61. * @param {object} payload - object that contains the payload
  62. * @param {string} payload.name - the name of the task
  63. * @param {string} payload.fn - the function the task will run
  64. * @param {string} payload.paused - if the task is currently paused
  65. * @param {boolean} payload.timeout - how often to run the task
  66. * @returns {Promise} - returns promise (reject, resolve)
  67. */
  68. CREATE_TASK(payload) {
  69. return new Promise((resolve, reject) => {
  70. TasksModule.tasks[payload.name] = {
  71. name: payload.name,
  72. fn: payload.fn,
  73. timeout: payload.timeout,
  74. lastRan: 0,
  75. timer: null
  76. };
  77. if (!payload.paused) {
  78. TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
  79. .then(() => resolve())
  80. .catch(err => reject(err));
  81. } else resolve();
  82. });
  83. }
  84. /**
  85. * Pauses a task
  86. *
  87. * @param {object} payload - object that contains the payload
  88. * @param {string} payload.taskName - the name of the task to pause
  89. * @returns {Promise} - returns promise (reject, resolve)
  90. */
  91. PAUSE_TASK(payload) {
  92. const taskName = { payload };
  93. return new Promise(resolve => {
  94. if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
  95. resolve();
  96. });
  97. }
  98. /**
  99. * Resumes a task
  100. *
  101. * @param {object} payload - object that contains the payload
  102. * @param {string} payload.name - the name of the task to resume
  103. * @returns {Promise} - returns promise (reject, resolve)
  104. */
  105. RESUME_TASK(payload) {
  106. return new Promise(resolve => {
  107. TasksModule.tasks[payload.name].timer.resume();
  108. resolve();
  109. });
  110. }
  111. /**
  112. * Runs a task's function and restarts the timer
  113. *
  114. * @param {object} payload - object that contains the payload
  115. * @param {string} payload.name - the name of the task to run
  116. * @returns {Promise} - returns promise (reject, resolve)
  117. */
  118. RUN_TASK(payload) {
  119. return new Promise(resolve => {
  120. const task = TasksModule.tasks[payload.name];
  121. if (task.timer) task.timer.pause();
  122. task.fn.apply(this).then(() => {
  123. task.lastRan = Date.now();
  124. task.timer = new Timer(
  125. () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
  126. task.timeout,
  127. false
  128. );
  129. resolve();
  130. });
  131. });
  132. }
  133. /**
  134. * Periodically checks if any stations need to be skipped
  135. *
  136. * @returns {Promise} - returns promise (reject, resolve)
  137. */
  138. checkStationSkipTask() {
  139. return new Promise(resolve => {
  140. TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  141. async.waterfall(
  142. [
  143. next => {
  144. CacheModule.runJob("HGETALL", { table: "stations" })
  145. .then(response => next(null, response))
  146. .catch(next);
  147. },
  148. (stations, next) => {
  149. async.each(
  150. stations,
  151. (station, next2) => {
  152. if (station.paused || !station.currentSong || !station.currentSong.title)
  153. return next2();
  154. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  155. if (timeElapsed <= station.currentSong.duration) return next2();
  156. TasksModule.log(
  157. "ERROR",
  158. "TASK_STATIONS_SKIP_CHECK",
  159. `Skipping ${station._id} as it should have skipped already.`
  160. );
  161. return StationsModule.runJob("INITIALIZE_STATION", {
  162. stationId: station._id
  163. }).then(() => next2());
  164. },
  165. () => next()
  166. );
  167. }
  168. ],
  169. () => resolve()
  170. );
  171. });
  172. }
  173. /**
  174. * Periodically checks if any sessions are out of date and need to be cleared
  175. *
  176. * @returns {Promise} - returns promise (reject, resolve)
  177. */
  178. sessionClearingTask() {
  179. return new Promise(resolve => {
  180. TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  181. async.waterfall(
  182. [
  183. next => {
  184. CacheModule.runJob("HGETALL", { table: "sessions" })
  185. .then(sessions => next(null, sessions))
  186. .catch(next);
  187. },
  188. (sessions, next) => {
  189. if (!sessions) return next();
  190. const keys = Object.keys(sessions);
  191. return async.each(
  192. keys,
  193. (sessionId, next2) => {
  194. const session = sessions[sessionId];
  195. if (
  196. session &&
  197. session.refreshDate &&
  198. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  199. )
  200. return next2();
  201. if (!session) {
  202. TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  203. return CacheModule.runJob("HDEL", {
  204. table: "sessions",
  205. key: sessionId
  206. }).finally(() => {
  207. next2();
  208. });
  209. }
  210. if (!session.refreshDate) {
  211. session.refreshDate = Date.now();
  212. return CacheModule.runJob("HSET", {
  213. table: "sessions",
  214. key: sessionId,
  215. value: session
  216. }).finally(() => next2());
  217. }
  218. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  219. return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {
  220. sessionId: session.sessionId
  221. }).then(response => {
  222. if (response.sockets.length > 0) {
  223. session.refreshDate = Date.now();
  224. CacheModule.runJob("HSET", {
  225. table: "sessions",
  226. key: sessionId,
  227. value: session
  228. }).finally(() => {
  229. next2();
  230. });
  231. } else {
  232. TasksModule.log(
  233. "INFO",
  234. "TASK_SESSION_CLEAR",
  235. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  236. );
  237. CacheModule.runJob("HDEL", {
  238. table: "sessions",
  239. key: session.sessionId
  240. }).finally(() => next2());
  241. }
  242. });
  243. }
  244. TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  245. return next2();
  246. },
  247. () => next()
  248. );
  249. }
  250. ],
  251. () => resolve()
  252. );
  253. });
  254. }
  255. /**
  256. * Periodically warns about the size of any log files
  257. *
  258. * @returns {Promise} - returns promise (reject, resolve)
  259. */
  260. logFileSizeCheckTask() {
  261. return new Promise((resolve, reject) => {
  262. TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  263. async.each(
  264. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  265. (fileName, next) => {
  266. try {
  267. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  268. const mb = stats.size / 1000000;
  269. if (mb > 25) return next(true);
  270. return next();
  271. } catch (err) {
  272. return next(err);
  273. }
  274. },
  275. async err => {
  276. if (err && err !== true) {
  277. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  278. return reject(new Error(err));
  279. }
  280. if (err === true) {
  281. TasksModule.log(
  282. "ERROR",
  283. "LOGGER_FILE_SIZE_WARNING",
  284. "************************************WARNING*************************************"
  285. );
  286. TasksModule.log(
  287. "ERROR",
  288. "LOGGER_FILE_SIZE_WARNING",
  289. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  290. );
  291. TasksModule.log(
  292. "ERROR",
  293. "LOGGER_FILE_SIZE_WARNING",
  294. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  295. );
  296. TasksModule.log(
  297. "ERROR",
  298. "LOGGER_FILE_SIZE_WARNING",
  299. "********************************************************************************"
  300. );
  301. }
  302. return resolve();
  303. }
  304. );
  305. });
  306. }
  307. /**
  308. * Periodically collect users in stations
  309. *
  310. * @returns {Promise} - returns promise (reject, resolve)
  311. */
  312. async collectStationUsersTask() {
  313. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  314. return new Promise(resolve => {
  315. TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
  316. const stationsCountUpdated = [];
  317. const stationsUpdated = [];
  318. const oldUsersPerStation = StationsModule.usersPerStation;
  319. const usersPerStation = { loggedIn: [], loggedOut: [] };
  320. const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
  321. const usersPerStationCount = {};
  322. async.each(
  323. Object.keys(StationsModule.userList),
  324. (socketId, next) => {
  325. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {
  326. const stationId = StationsModule.userList[socketId];
  327. const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: `station.${stationId}` });
  328. if (!socket || room.includes(socketId)) {
  329. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  330. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));
  331. delete StationsModule.userList[socketId];
  332. return next();
  333. }
  334. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station
  335. if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };
  336. return async.waterfall(
  337. [
  338. next => {
  339. if (!socket.session || !socket.session.sessionId) {
  340. return next("No session found.", { ip: socket.ip });
  341. }
  342. return CacheModule.runJob("HGET", {
  343. table: "sessions",
  344. key: socket.session.sessionId
  345. })
  346. .then(session => next(null, session))
  347. .catch(next);
  348. },
  349. (session, next) => {
  350. if (!session) return next("Session not found.");
  351. return userModel.findOne({ _id: session.userId }, next);
  352. },
  353. (user, next) => {
  354. if (!user) return next("User not found.");
  355. if (usersPerStation[stationId].loggedIn.some(u => user.username === u.username))
  356. return next("User already in the list.");
  357. usersPerStationCount[stationId] += 1; // increment user count for station
  358. return next(null, {
  359. username: user.username,
  360. name: user.name,
  361. avatar: user.avatar
  362. });
  363. }
  364. ],
  365. (err, user) => {
  366. if (!err) usersPerStation[stationId].loggedIn.push(user);
  367. // if user is logged out (an ip can only be counted once)
  368. if (
  369. err === "No session found." &&
  370. !usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)
  371. ) {
  372. usersPerStationCount[stationId] += 1; // increment user count for station
  373. usersPerStation[stationId].loggedOut.push(user);
  374. }
  375. next();
  376. }
  377. );
  378. });
  379. },
  380. () => {
  381. Object.keys(usersPerStationCount).forEach(stationId => {
  382. if (
  383. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&
  384. stationsCountUpdated.indexOf(stationId) === -1
  385. ) {
  386. this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  387. CacheModule.runJob("PUB", {
  388. channel: "station.updateUserCount",
  389. value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }
  390. });
  391. }
  392. });
  393. Object.keys(usersPerStation).forEach(stationId => {
  394. if (
  395. !oldUsersPerStation[stationId] ||
  396. JSON.stringify(oldUsersPerStation[stationId]) !==
  397. JSON.stringify(usersPerStation[stationId]) ||
  398. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]
  399. ) {
  400. this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  401. CacheModule.runJob("PUB", {
  402. channel: "station.updateUsers",
  403. value: { stationId, usersPerStation: usersPerStation[stationId] }
  404. });
  405. }
  406. });
  407. StationsModule.usersPerStationCount = usersPerStationCount;
  408. StationsModule.usersPerStation = usersPerStation;
  409. }
  410. );
  411. resolve();
  412. });
  413. }
  414. }
  415. export default new _TasksModule();