tasks.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. let TasksModule;
  9. let CacheModule;
  10. let StationsModule;
  11. let UtilsModule;
  12. let WSModule;
  13. let DBModule;
  14. class _TasksModule extends CoreClass {
  15. // eslint-disable-next-line require-jsdoc
  16. constructor() {
  17. super("tasks");
  18. this.tasks = {};
  19. TasksModule = this;
  20. }
  21. /**
  22. * Initialises the tasks module
  23. *
  24. * @returns {Promise} - returns promise (reject, resolve)
  25. */
  26. initialize() {
  27. return new Promise(resolve => {
  28. CacheModule = this.moduleManager.modules.cache;
  29. StationsModule = this.moduleManager.modules.stations;
  30. UtilsModule = this.moduleManager.modules.utils;
  31. WSModule = this.moduleManager.modules.ws;
  32. DBModule = this.moduleManager.modules.db;
  33. TasksModule.runJob("CREATE_TASK", {
  34. name: "stationSkipTask",
  35. fn: TasksModule.checkStationSkipTask,
  36. timeout: 1000 * 60 * 30
  37. });
  38. TasksModule.runJob("CREATE_TASK", {
  39. name: "sessionClearTask",
  40. fn: TasksModule.sessionClearingTask,
  41. timeout: 1000 * 60 * 60 * 6
  42. });
  43. // TasksModule.runJob("CREATE_TASK", {
  44. // name: "logFileSizeCheckTask",
  45. // fn: TasksModule.logFileSizeCheckTask,
  46. // timeout: 1000 * 60 * 60
  47. // });
  48. TasksModule.runJob("CREATE_TASK", {
  49. name: "collectStationUsersTask",
  50. fn: TasksModule.collectStationUsersTask,
  51. timeout: 1000 * 3
  52. });
  53. resolve();
  54. });
  55. }
  56. /**
  57. * Creates a new task
  58. *
  59. * @param {object} payload - object that contains the payload
  60. * @param {string} payload.name - the name of the task
  61. * @param {string} payload.fn - the function the task will run
  62. * @param {string} payload.paused - if the task is currently paused
  63. * @param {boolean} payload.timeout - how often to run the task
  64. * @returns {Promise} - returns promise (reject, resolve)
  65. */
  66. CREATE_TASK(payload) {
  67. return new Promise((resolve, reject) => {
  68. TasksModule.tasks[payload.name] = {
  69. name: payload.name,
  70. fn: payload.fn,
  71. timeout: payload.timeout,
  72. lastRan: 0,
  73. timer: null
  74. };
  75. if (!payload.paused) {
  76. TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
  77. .then(() => resolve())
  78. .catch(err => reject(err));
  79. } else resolve();
  80. });
  81. }
  82. /**
  83. * Pauses a task
  84. *
  85. * @param {object} payload - object that contains the payload
  86. * @param {string} payload.taskName - the name of the task to pause
  87. * @returns {Promise} - returns promise (reject, resolve)
  88. */
  89. PAUSE_TASK(payload) {
  90. const taskName = { payload };
  91. return new Promise(resolve => {
  92. if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
  93. resolve();
  94. });
  95. }
  96. /**
  97. * Resumes a task
  98. *
  99. * @param {object} payload - object that contains the payload
  100. * @param {string} payload.name - the name of the task to resume
  101. * @returns {Promise} - returns promise (reject, resolve)
  102. */
  103. RESUME_TASK(payload) {
  104. return new Promise(resolve => {
  105. TasksModule.tasks[payload.name].timer.resume();
  106. resolve();
  107. });
  108. }
  109. /**
  110. * Runs a task's function and restarts the timer
  111. *
  112. * @param {object} payload - object that contains the payload
  113. * @param {string} payload.name - the name of the task to run
  114. * @returns {Promise} - returns promise (reject, resolve)
  115. */
  116. RUN_TASK(payload) {
  117. return new Promise(resolve => {
  118. const task = TasksModule.tasks[payload.name];
  119. if (task.timer) task.timer.pause();
  120. task.fn.apply(this).then(() => {
  121. task.lastRan = Date.now();
  122. task.timer = new Timer(
  123. () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
  124. task.timeout,
  125. false
  126. );
  127. resolve();
  128. });
  129. });
  130. }
  131. /**
  132. * Periodically checks if any stations need to be skipped
  133. *
  134. * @returns {Promise} - returns promise (reject, resolve)
  135. */
  136. checkStationSkipTask() {
  137. return new Promise(resolve => {
  138. TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  139. async.waterfall(
  140. [
  141. next => {
  142. CacheModule.runJob("HGETALL", { table: "stations" })
  143. .then(response => next(null, response))
  144. .catch(next);
  145. },
  146. (stations, next) => {
  147. async.each(
  148. stations,
  149. (station, next2) => {
  150. if (station.paused || !station.currentSong || !station.currentSong.title)
  151. return next2();
  152. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  153. if (timeElapsed <= station.currentSong.duration) return next2();
  154. TasksModule.log(
  155. "ERROR",
  156. "TASK_STATIONS_SKIP_CHECK",
  157. `Skipping ${station._id} as it should have skipped already.`
  158. );
  159. return StationsModule.runJob("INITIALIZE_STATION", {
  160. stationId: station._id
  161. }).then(() => next2());
  162. },
  163. () => next()
  164. );
  165. }
  166. ],
  167. () => resolve()
  168. );
  169. });
  170. }
  171. /**
  172. * Periodically checks if any sessions are out of date and need to be cleared
  173. *
  174. * @returns {Promise} - returns promise (reject, resolve)
  175. */
  176. sessionClearingTask() {
  177. return new Promise(resolve => {
  178. TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  179. async.waterfall(
  180. [
  181. next => {
  182. CacheModule.runJob("HGETALL", { table: "sessions" })
  183. .then(sessions => next(null, sessions))
  184. .catch(next);
  185. },
  186. (sessions, next) => {
  187. if (!sessions) return next();
  188. const keys = Object.keys(sessions);
  189. return async.each(
  190. keys,
  191. (sessionId, next2) => {
  192. const session = sessions[sessionId];
  193. if (
  194. session &&
  195. session.refreshDate &&
  196. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  197. )
  198. return next2();
  199. if (!session) {
  200. TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  201. return CacheModule.runJob("HDEL", {
  202. table: "sessions",
  203. key: sessionId
  204. }).finally(() => {
  205. next2();
  206. });
  207. }
  208. if (!session.refreshDate) {
  209. session.refreshDate = Date.now();
  210. return CacheModule.runJob("HSET", {
  211. table: "sessions",
  212. key: sessionId,
  213. value: session
  214. }).finally(() => next2());
  215. }
  216. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  217. return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {
  218. sessionId: session.sessionId
  219. }).then(sockets => {
  220. if (sockets.length > 0) {
  221. session.refreshDate = Date.now();
  222. CacheModule.runJob("HSET", {
  223. table: "sessions",
  224. key: sessionId,
  225. value: session
  226. }).finally(() => {
  227. next2();
  228. });
  229. } else {
  230. TasksModule.log(
  231. "INFO",
  232. "TASK_SESSION_CLEAR",
  233. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  234. );
  235. CacheModule.runJob("HDEL", {
  236. table: "sessions",
  237. key: session.sessionId
  238. }).finally(() => next2());
  239. }
  240. });
  241. }
  242. TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  243. return next2();
  244. },
  245. () => next()
  246. );
  247. }
  248. ],
  249. () => resolve()
  250. );
  251. });
  252. }
  253. /**
  254. * Periodically warns about the size of any log files
  255. *
  256. * @returns {Promise} - returns promise (reject, resolve)
  257. */
  258. logFileSizeCheckTask() {
  259. return new Promise((resolve, reject) => {
  260. TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  261. async.each(
  262. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  263. (fileName, next) => {
  264. try {
  265. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  266. const mb = stats.size / 1000000;
  267. if (mb > 25) return next(true);
  268. return next();
  269. } catch (err) {
  270. return next(err);
  271. }
  272. },
  273. async err => {
  274. if (err && err !== true) {
  275. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  276. return reject(new Error(err));
  277. }
  278. if (err === true) {
  279. TasksModule.log(
  280. "ERROR",
  281. "LOGGER_FILE_SIZE_WARNING",
  282. "************************************WARNING*************************************"
  283. );
  284. TasksModule.log(
  285. "ERROR",
  286. "LOGGER_FILE_SIZE_WARNING",
  287. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  288. );
  289. TasksModule.log(
  290. "ERROR",
  291. "LOGGER_FILE_SIZE_WARNING",
  292. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  293. );
  294. TasksModule.log(
  295. "ERROR",
  296. "LOGGER_FILE_SIZE_WARNING",
  297. "********************************************************************************"
  298. );
  299. }
  300. return resolve();
  301. }
  302. );
  303. });
  304. }
  305. /**
  306. * Periodically collect users in stations
  307. *
  308. * @returns {Promise} - returns promise (reject, resolve)
  309. */
  310. async collectStationUsersTask() {
  311. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  312. return new Promise(resolve => {
  313. TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
  314. const stationsCountUpdated = [];
  315. const stationsUpdated = [];
  316. const oldUsersPerStation = StationsModule.usersPerStation;
  317. const usersPerStation = { loggedIn: [], loggedOut: [] };
  318. const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
  319. const usersPerStationCount = {};
  320. async.each(
  321. Object.keys(StationsModule.userList),
  322. (socketId, next) => {
  323. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {
  324. const stationId = StationsModule.userList[socketId];
  325. const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  326. room: `station.${stationId}`
  327. });
  328. if (!socket || !room.includes(socketId)) {
  329. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  330. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));
  331. delete StationsModule.userList[socketId];
  332. return next();
  333. }
  334. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station
  335. if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };
  336. return async.waterfall(
  337. [
  338. next => {
  339. if (!socket.session || !socket.session.sessionId) {
  340. return next("No session found.", { ip: socket.ip });
  341. }
  342. return CacheModule.runJob("HGET", {
  343. table: "sessions",
  344. key: socket.session.sessionId
  345. })
  346. .then(session => next(null, session))
  347. .catch(next);
  348. },
  349. (session, next) => {
  350. if (!session) return next("Session not found.");
  351. return userModel.findOne({ _id: session.userId }, next);
  352. },
  353. (user, next) => {
  354. if (!user) return next("User not found.");
  355. if (usersPerStation[stationId].loggedIn.some(u => user.username === u.username))
  356. return next("User already in the list.");
  357. usersPerStationCount[stationId] += 1; // increment user count for station
  358. return next(null, {
  359. _id: user._id,
  360. username: user.username,
  361. name: user.name,
  362. avatar: user.avatar
  363. });
  364. }
  365. ],
  366. (err, user) => {
  367. if (!err) usersPerStation[stationId].loggedIn.push(user);
  368. // if user is logged out (an ip can only be counted once)
  369. if (
  370. err === "No session found." &&
  371. !usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)
  372. ) {
  373. usersPerStationCount[stationId] += 1; // increment user count for station
  374. usersPerStation[stationId].loggedOut.push(user);
  375. }
  376. next();
  377. }
  378. );
  379. });
  380. },
  381. () => {
  382. Object.keys(usersPerStationCount).forEach(stationId => {
  383. if (
  384. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&
  385. stationsCountUpdated.indexOf(stationId) === -1
  386. ) {
  387. this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  388. CacheModule.runJob("PUB", {
  389. channel: "station.updateUserCount",
  390. value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }
  391. });
  392. }
  393. });
  394. Object.keys(usersPerStation).forEach(stationId => {
  395. if (
  396. !oldUsersPerStation[stationId] ||
  397. JSON.stringify(oldUsersPerStation[stationId]) !==
  398. JSON.stringify(usersPerStation[stationId]) ||
  399. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]
  400. ) {
  401. this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  402. CacheModule.runJob("PUB", {
  403. channel: "station.updateUsers",
  404. value: { stationId, usersPerStation: usersPerStation[stationId] }
  405. });
  406. }
  407. });
  408. StationsModule.usersPerStationCount = usersPerStationCount;
  409. StationsModule.usersPerStation = usersPerStation;
  410. }
  411. );
  412. resolve();
  413. });
  414. }
  415. }
  416. export default new _TasksModule();