tasks.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. let TasksModule;
  9. let CacheModule;
  10. let StationsModule;
  11. let UtilsModule;
  12. let WSModule;
  13. let DBModule;
  14. class _TasksModule extends CoreClass {
  15. // eslint-disable-next-line require-jsdoc
  16. constructor() {
  17. super("tasks");
  18. this.tasks = {};
  19. TasksModule = this;
  20. }
  21. /**
  22. * Initialises the tasks module
  23. *
  24. * @returns {Promise} - returns promise (reject, resolve)
  25. */
  26. initialize() {
  27. return new Promise(resolve => {
  28. CacheModule = this.moduleManager.modules.cache;
  29. StationsModule = this.moduleManager.modules.stations;
  30. UtilsModule = this.moduleManager.modules.utils;
  31. WSModule = this.moduleManager.modules.ws;
  32. DBModule = this.moduleManager.modules.db;
  33. TasksModule.runJob("CREATE_TASK", {
  34. name: "stationSkipTask",
  35. fn: TasksModule.checkStationSkipTask,
  36. timeout: 1000 * 60 * 30
  37. });
  38. TasksModule.runJob("CREATE_TASK", {
  39. name: "sessionClearTask",
  40. fn: TasksModule.sessionClearingTask,
  41. timeout: 1000 * 60 * 60 * 6
  42. });
  43. // TasksModule.runJob("CREATE_TASK", {
  44. // name: "logFileSizeCheckTask",
  45. // fn: TasksModule.logFileSizeCheckTask,
  46. // timeout: 1000 * 60 * 60
  47. // });
  48. TasksModule.runJob("CREATE_TASK", {
  49. name: "collectStationUsersTask",
  50. fn: TasksModule.collectStationUsersTask,
  51. timeout: 1000 * 3
  52. });
  53. TasksModule.runJob("CREATE_TASK", {
  54. name: "historyClearTask",
  55. fn: TasksModule.historyClearTask,
  56. timeout: 1000 * 60 * 60 * 6
  57. });
  58. resolve();
  59. });
  60. }
  61. /**
  62. * Creates a new task
  63. *
  64. * @param {object} payload - object that contains the payload
  65. * @param {string} payload.name - the name of the task
  66. * @param {string} payload.fn - the function the task will run
  67. * @param {string} payload.paused - if the task is currently paused
  68. * @param {boolean} payload.timeout - how often to run the task
  69. * @returns {Promise} - returns promise (reject, resolve)
  70. */
  71. CREATE_TASK(payload) {
  72. return new Promise((resolve, reject) => {
  73. TasksModule.tasks[payload.name] = {
  74. name: payload.name,
  75. fn: payload.fn,
  76. timeout: payload.timeout,
  77. lastRan: 0,
  78. timer: null
  79. };
  80. if (!payload.paused) {
  81. TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
  82. .then(() => resolve())
  83. .catch(err => reject(err));
  84. } else resolve();
  85. });
  86. }
  87. /**
  88. * Pauses a task
  89. *
  90. * @param {object} payload - object that contains the payload
  91. * @param {string} payload.taskName - the name of the task to pause
  92. * @returns {Promise} - returns promise (reject, resolve)
  93. */
  94. PAUSE_TASK(payload) {
  95. const taskName = { payload };
  96. return new Promise(resolve => {
  97. if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
  98. resolve();
  99. });
  100. }
  101. /**
  102. * Resumes a task
  103. *
  104. * @param {object} payload - object that contains the payload
  105. * @param {string} payload.name - the name of the task to resume
  106. * @returns {Promise} - returns promise (reject, resolve)
  107. */
  108. RESUME_TASK(payload) {
  109. return new Promise(resolve => {
  110. TasksModule.tasks[payload.name].timer.resume();
  111. resolve();
  112. });
  113. }
  114. /**
  115. * Runs a task's function and restarts the timer
  116. *
  117. * @param {object} payload - object that contains the payload
  118. * @param {string} payload.name - the name of the task to run
  119. * @returns {Promise} - returns promise (reject, resolve)
  120. */
  121. RUN_TASK(payload) {
  122. return new Promise(resolve => {
  123. const task = TasksModule.tasks[payload.name];
  124. if (task.timer) task.timer.pause();
  125. task.fn.apply(this).then(() => {
  126. task.lastRan = Date.now();
  127. task.timer = new Timer(
  128. () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
  129. task.timeout,
  130. false
  131. );
  132. resolve();
  133. });
  134. });
  135. }
  136. /**
  137. * Periodically checks if any stations need to be skipped
  138. *
  139. * @returns {Promise} - returns promise (reject, resolve)
  140. */
  141. checkStationSkipTask() {
  142. return new Promise(resolve => {
  143. TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  144. async.waterfall(
  145. [
  146. next => {
  147. CacheModule.runJob("HGETALL", { table: "stations" })
  148. .then(response => next(null, response))
  149. .catch(next);
  150. },
  151. (stations, next) => {
  152. async.each(
  153. stations,
  154. (station, next2) => {
  155. if (station.paused || !station.currentSong || !station.currentSong.title)
  156. return next2();
  157. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  158. if (timeElapsed <= station.currentSong.duration) return next2();
  159. TasksModule.log(
  160. "ERROR",
  161. "TASK_STATIONS_SKIP_CHECK",
  162. `Skipping ${station._id} as it should have skipped already.`
  163. );
  164. return StationsModule.runJob("INITIALIZE_STATION", {
  165. stationId: station._id
  166. }).then(() => next2());
  167. },
  168. () => next()
  169. );
  170. }
  171. ],
  172. () => resolve()
  173. );
  174. });
  175. }
  176. /**
  177. * Periodically checks if any sessions are out of date and need to be cleared
  178. *
  179. * @returns {Promise} - returns promise (reject, resolve)
  180. */
  181. sessionClearingTask() {
  182. return new Promise(resolve => {
  183. TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  184. async.waterfall(
  185. [
  186. next => {
  187. CacheModule.runJob("HGETALL", { table: "sessions" })
  188. .then(sessions => next(null, sessions))
  189. .catch(next);
  190. },
  191. (sessions, next) => {
  192. if (!sessions) return next();
  193. const keys = Object.keys(sessions);
  194. return async.each(
  195. keys,
  196. (sessionId, next2) => {
  197. const session = sessions[sessionId];
  198. if (
  199. session &&
  200. session.refreshDate &&
  201. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  202. )
  203. return next2();
  204. if (!session) {
  205. TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  206. return CacheModule.runJob("HDEL", {
  207. table: "sessions",
  208. key: sessionId
  209. }).finally(() => {
  210. next2();
  211. });
  212. }
  213. if (!session.refreshDate) {
  214. session.refreshDate = Date.now();
  215. return CacheModule.runJob("HSET", {
  216. table: "sessions",
  217. key: sessionId,
  218. value: session
  219. }).finally(() => next2());
  220. }
  221. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  222. return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {
  223. sessionId: session.sessionId
  224. }).then(sockets => {
  225. if (sockets.length > 0) {
  226. session.refreshDate = Date.now();
  227. CacheModule.runJob("HSET", {
  228. table: "sessions",
  229. key: sessionId,
  230. value: session
  231. }).finally(() => {
  232. next2();
  233. });
  234. } else {
  235. TasksModule.log(
  236. "INFO",
  237. "TASK_SESSION_CLEAR",
  238. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  239. );
  240. CacheModule.runJob("HDEL", {
  241. table: "sessions",
  242. key: session.sessionId
  243. }).finally(() => next2());
  244. }
  245. });
  246. }
  247. TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  248. return next2();
  249. },
  250. () => next()
  251. );
  252. }
  253. ],
  254. () => resolve()
  255. );
  256. });
  257. }
  258. /**
  259. * Periodically warns about the size of any log files
  260. *
  261. * @returns {Promise} - returns promise (reject, resolve)
  262. */
  263. logFileSizeCheckTask() {
  264. return new Promise((resolve, reject) => {
  265. TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  266. async.each(
  267. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  268. (fileName, next) => {
  269. try {
  270. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  271. const mb = stats.size / 1000000;
  272. if (mb > 25) return next(true);
  273. return next();
  274. } catch (err) {
  275. return next(err);
  276. }
  277. },
  278. async err => {
  279. if (err && err !== true) {
  280. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  281. return reject(new Error(err));
  282. }
  283. if (err === true) {
  284. TasksModule.log(
  285. "ERROR",
  286. "LOGGER_FILE_SIZE_WARNING",
  287. "************************************WARNING*************************************"
  288. );
  289. TasksModule.log(
  290. "ERROR",
  291. "LOGGER_FILE_SIZE_WARNING",
  292. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  293. );
  294. TasksModule.log(
  295. "ERROR",
  296. "LOGGER_FILE_SIZE_WARNING",
  297. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  298. );
  299. TasksModule.log(
  300. "ERROR",
  301. "LOGGER_FILE_SIZE_WARNING",
  302. "********************************************************************************"
  303. );
  304. }
  305. return resolve();
  306. }
  307. );
  308. });
  309. }
  310. /**
  311. * Periodically collect users in stations
  312. *
  313. * @returns {Promise} - returns promise (reject, resolve)
  314. */
  315. async collectStationUsersTask() {
  316. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  317. return new Promise(resolve => {
  318. TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
  319. const stationsCountUpdated = [];
  320. const stationsUpdated = [];
  321. const oldUsersPerStation = StationsModule.usersPerStation;
  322. const usersPerStation = { loggedIn: [], loggedOut: [] };
  323. const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
  324. const usersPerStationCount = {};
  325. async.each(
  326. Object.keys(StationsModule.userList),
  327. (socketId, next) => {
  328. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {
  329. const stationId = StationsModule.userList[socketId];
  330. const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  331. room: `station.${stationId}`
  332. });
  333. if (!socket || !room.includes(socketId)) {
  334. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  335. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));
  336. delete StationsModule.userList[socketId];
  337. return next();
  338. }
  339. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station
  340. if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };
  341. return async.waterfall(
  342. [
  343. next => {
  344. if (!socket.session || !socket.session.sessionId) {
  345. return next("No session found.", { ip: socket.ip });
  346. }
  347. return CacheModule.runJob("HGET", {
  348. table: "sessions",
  349. key: socket.session.sessionId
  350. })
  351. .then(session => next(null, session))
  352. .catch(next);
  353. },
  354. (session, next) => {
  355. if (!session) return next("Session not found.");
  356. return userModel.findOne({ _id: session.userId }, next);
  357. },
  358. (user, next) => {
  359. if (!user) return next("User not found.");
  360. if (usersPerStation[stationId].loggedIn.some(u => user.username === u.username))
  361. return next("User already in the list.");
  362. usersPerStationCount[stationId] += 1; // increment user count for station
  363. return next(null, {
  364. _id: user._id,
  365. username: user.username,
  366. name: user.name,
  367. avatar: user.avatar
  368. });
  369. }
  370. ],
  371. (err, user) => {
  372. if (!err) usersPerStation[stationId].loggedIn.push(user);
  373. // if user is logged out (an ip can only be counted once)
  374. if (
  375. err === "No session found." &&
  376. !usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)
  377. ) {
  378. usersPerStationCount[stationId] += 1; // increment user count for station
  379. usersPerStation[stationId].loggedOut.push(user);
  380. }
  381. next();
  382. }
  383. );
  384. });
  385. },
  386. () => {
  387. Object.keys(usersPerStationCount).forEach(stationId => {
  388. if (
  389. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&
  390. stationsCountUpdated.indexOf(stationId) === -1
  391. ) {
  392. this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  393. CacheModule.runJob("PUB", {
  394. channel: "station.updateUserCount",
  395. value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }
  396. });
  397. }
  398. });
  399. Object.keys(usersPerStation).forEach(stationId => {
  400. if (
  401. !oldUsersPerStation[stationId] ||
  402. JSON.stringify(oldUsersPerStation[stationId]) !==
  403. JSON.stringify(usersPerStation[stationId]) ||
  404. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]
  405. ) {
  406. this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  407. CacheModule.runJob("PUB", {
  408. channel: "station.updateUsers",
  409. value: { stationId, usersPerStation: usersPerStation[stationId] }
  410. });
  411. }
  412. });
  413. StationsModule.usersPerStationCount = usersPerStationCount;
  414. StationsModule.usersPerStation = usersPerStation;
  415. }
  416. );
  417. resolve();
  418. });
  419. }
  420. /**
  421. * Periodically removes any old history documents
  422. *
  423. * @returns {Promise} - returns promise (reject, resolve)
  424. */
  425. async historyClearTask() {
  426. TasksModule.log("INFO", "TASK_HISTORY_CLEAR", `Removing old history.`);
  427. const stationHistoryModel = await DBModule.runJob("GET_MODEL", { modelName: "stationHistory" });
  428. // Remove documents created more than 2 days ago
  429. const mongoQuery = { "payload.skippedAt": { $lt: new Date(new Date().getTime() - 1000 * 60 * 60 * 24 * 2) } };
  430. const count = await stationHistoryModel.count(mongoQuery);
  431. await stationHistoryModel.remove(mongoQuery);
  432. TasksModule.log("SUCCESS", "TASK_HISTORY_CLEAR", `Removed ${count} history items`);
  433. }
  434. }
  435. export default new _TasksModule();