tasks.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. let TasksModule;
  9. let CacheModule;
  10. let StationsModule;
  11. let UtilsModule;
  12. let WSModule;
  13. let DBModule;
  14. class _TasksModule extends CoreClass {
  15. // eslint-disable-next-line require-jsdoc
  16. constructor() {
  17. super("tasks");
  18. this.tasks = {};
  19. TasksModule = this;
  20. }
  21. /**
  22. * Initialises the tasks module
  23. *
  24. * @returns {Promise} - returns promise (reject, resolve)
  25. */
  26. initialize() {
  27. return new Promise(resolve => {
  28. CacheModule = this.moduleManager.modules.cache;
  29. StationsModule = this.moduleManager.modules.stations;
  30. UtilsModule = this.moduleManager.modules.utils;
  31. WSModule = this.moduleManager.modules.ws;
  32. DBModule = this.moduleManager.modules.db;
  33. TasksModule.runJob("CREATE_TASK", {
  34. name: "stationSkipTask",
  35. fn: TasksModule.checkStationSkipTask,
  36. timeout: 1000 * 60 * 30
  37. });
  38. TasksModule.runJob("CREATE_TASK", {
  39. name: "sessionClearTask",
  40. fn: TasksModule.sessionClearingTask,
  41. timeout: 1000 * 60 * 60 * 6
  42. });
  43. TasksModule.runJob("CREATE_TASK", {
  44. name: "collectStationUsersTask",
  45. fn: TasksModule.collectStationUsersTask,
  46. timeout: 1000 * 3
  47. });
  48. resolve();
  49. });
  50. }
  51. /**
  52. * Creates a new task
  53. *
  54. * @param {object} payload - object that contains the payload
  55. * @param {string} payload.name - the name of the task
  56. * @param {string} payload.fn - the function the task will run
  57. * @param {string} payload.paused - if the task is currently paused
  58. * @param {boolean} payload.timeout - how often to run the task
  59. * @returns {Promise} - returns promise (reject, resolve)
  60. */
  61. CREATE_TASK(payload) {
  62. return new Promise((resolve, reject) => {
  63. TasksModule.tasks[payload.name] = {
  64. name: payload.name,
  65. fn: payload.fn,
  66. timeout: payload.timeout,
  67. lastRan: 0,
  68. timer: null
  69. };
  70. if (!payload.paused) {
  71. TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
  72. .then(() => resolve())
  73. .catch(err => reject(err));
  74. } else resolve();
  75. });
  76. }
  77. /**
  78. * Pauses a task
  79. *
  80. * @param {object} payload - object that contains the payload
  81. * @param {string} payload.taskName - the name of the task to pause
  82. * @returns {Promise} - returns promise (reject, resolve)
  83. */
  84. PAUSE_TASK(payload) {
  85. const taskName = { payload };
  86. return new Promise(resolve => {
  87. if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
  88. resolve();
  89. });
  90. }
  91. /**
  92. * Resumes a task
  93. *
  94. * @param {object} payload - object that contains the payload
  95. * @param {string} payload.name - the name of the task to resume
  96. * @returns {Promise} - returns promise (reject, resolve)
  97. */
  98. RESUME_TASK(payload) {
  99. return new Promise(resolve => {
  100. TasksModule.tasks[payload.name].timer.resume();
  101. resolve();
  102. });
  103. }
  104. /**
  105. * Runs a task's function and restarts the timer
  106. *
  107. * @param {object} payload - object that contains the payload
  108. * @param {string} payload.name - the name of the task to run
  109. * @returns {Promise} - returns promise (reject, resolve)
  110. */
  111. RUN_TASK(payload) {
  112. return new Promise(resolve => {
  113. const task = TasksModule.tasks[payload.name];
  114. if (task.timer) task.timer.pause();
  115. task.fn.apply(this).then(() => {
  116. task.lastRan = Date.now();
  117. task.timer = new Timer(
  118. () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
  119. task.timeout,
  120. false
  121. );
  122. resolve();
  123. });
  124. });
  125. }
  126. /**
  127. * Periodically checks if any stations need to be skipped
  128. *
  129. * @returns {Promise} - returns promise (reject, resolve)
  130. */
  131. checkStationSkipTask() {
  132. return new Promise(resolve => {
  133. TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  134. async.waterfall(
  135. [
  136. next => {
  137. CacheModule.runJob("HGETALL", { table: "stations" })
  138. .then(response => next(null, response))
  139. .catch(next);
  140. },
  141. (stations, next) => {
  142. async.each(
  143. stations,
  144. (station, next2) => {
  145. if (station.paused || !station.currentSong || !station.currentSong.title)
  146. return next2();
  147. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  148. if (timeElapsed <= station.currentSong.duration) return next2();
  149. TasksModule.log(
  150. "ERROR",
  151. "TASK_STATIONS_SKIP_CHECK",
  152. `Skipping ${station._id} as it should have skipped already.`
  153. );
  154. return StationsModule.runJob("INITIALIZE_STATION", {
  155. stationId: station._id
  156. }).then(() => next2());
  157. },
  158. () => next()
  159. );
  160. }
  161. ],
  162. () => resolve()
  163. );
  164. });
  165. }
  166. /**
  167. * Periodically checks if any sessions are out of date and need to be cleared
  168. *
  169. * @returns {Promise} - returns promise (reject, resolve)
  170. */
  171. sessionClearingTask() {
  172. return new Promise(resolve => {
  173. TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  174. async.waterfall(
  175. [
  176. next => {
  177. CacheModule.runJob("HGETALL", { table: "sessions" })
  178. .then(sessions => next(null, sessions))
  179. .catch(next);
  180. },
  181. (sessions, next) => {
  182. if (!sessions) return next();
  183. const keys = Object.keys(sessions);
  184. return async.each(
  185. keys,
  186. (sessionId, next2) => {
  187. const session = sessions[sessionId];
  188. if (
  189. session &&
  190. session.refreshDate &&
  191. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  192. )
  193. return next2();
  194. if (!session) {
  195. TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  196. return CacheModule.runJob("HDEL", {
  197. table: "sessions",
  198. key: sessionId
  199. }).finally(() => {
  200. next2();
  201. });
  202. }
  203. if (!session.refreshDate) {
  204. session.refreshDate = Date.now();
  205. return CacheModule.runJob("HSET", {
  206. table: "sessions",
  207. key: sessionId,
  208. value: session
  209. }).finally(() => next2());
  210. }
  211. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  212. return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {
  213. sessionId: session.sessionId
  214. }).then(response => {
  215. if (response.sockets.length > 0) {
  216. session.refreshDate = Date.now();
  217. CacheModule.runJob("HSET", {
  218. table: "sessions",
  219. key: sessionId,
  220. value: session
  221. }).finally(() => {
  222. next2();
  223. });
  224. } else {
  225. TasksModule.log(
  226. "INFO",
  227. "TASK_SESSION_CLEAR",
  228. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  229. );
  230. CacheModule.runJob("HDEL", {
  231. table: "sessions",
  232. key: session.sessionId
  233. }).finally(() => next2());
  234. }
  235. });
  236. }
  237. TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  238. return next2();
  239. },
  240. () => next()
  241. );
  242. }
  243. ],
  244. () => resolve()
  245. );
  246. });
  247. }
  248. /**
  249. * Periodically warns about the size of any log files
  250. *
  251. * @returns {Promise} - returns promise (reject, resolve)
  252. */
  253. logFileSizeCheckTask() {
  254. return new Promise((resolve, reject) => {
  255. TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  256. async.each(
  257. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  258. (fileName, next) => {
  259. try {
  260. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  261. const mb = stats.size / 1000000;
  262. if (mb > 25) return next(true);
  263. return next();
  264. } catch (err) {
  265. return next(err);
  266. }
  267. },
  268. async err => {
  269. if (err && err !== true) {
  270. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  271. return reject(new Error(err));
  272. }
  273. if (err === true) {
  274. TasksModule.log(
  275. "ERROR",
  276. "LOGGER_FILE_SIZE_WARNING",
  277. "************************************WARNING*************************************"
  278. );
  279. TasksModule.log(
  280. "ERROR",
  281. "LOGGER_FILE_SIZE_WARNING",
  282. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  283. );
  284. TasksModule.log(
  285. "ERROR",
  286. "LOGGER_FILE_SIZE_WARNING",
  287. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  288. );
  289. TasksModule.log(
  290. "ERROR",
  291. "LOGGER_FILE_SIZE_WARNING",
  292. "********************************************************************************"
  293. );
  294. }
  295. return resolve();
  296. }
  297. );
  298. });
  299. }
  300. /**
  301. * Periodically collect users in stations
  302. *
  303. * @returns {Promise} - returns promise (reject, resolve)
  304. */
  305. async collectStationUsersTask() {
  306. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  307. return new Promise(resolve => {
  308. TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
  309. const stationsCountUpdated = [];
  310. const stationsUpdated = [];
  311. const oldUsersPerStation = StationsModule.usersPerStation;
  312. const usersPerStation = { loggedIn: [], loggedOut: [] };
  313. const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
  314. const usersPerStationCount = {};
  315. async.each(
  316. Object.keys(StationsModule.userList),
  317. (socketId, next) => {
  318. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {
  319. const stationId = StationsModule.userList[socketId];
  320. const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  321. room: `station.${stationId}`
  322. });
  323. if (!socket || !room.includes(socketId)) {
  324. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  325. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));
  326. delete StationsModule.userList[socketId];
  327. return next();
  328. }
  329. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station
  330. if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };
  331. return async.waterfall(
  332. [
  333. next => {
  334. if (!socket.session || !socket.session.sessionId) {
  335. return next("No session found.", { ip: socket.ip });
  336. }
  337. return CacheModule.runJob("HGET", {
  338. table: "sessions",
  339. key: socket.session.sessionId
  340. })
  341. .then(session => next(null, session))
  342. .catch(next);
  343. },
  344. (session, next) => {
  345. if (!session) return next("Session not found.");
  346. return userModel.findOne({ _id: session.userId }, next);
  347. },
  348. (user, next) => {
  349. if (!user) return next("User not found.");
  350. if (usersPerStation[stationId].loggedIn.some(u => user.username === u.username))
  351. return next("User already in the list.");
  352. usersPerStationCount[stationId] += 1; // increment user count for station
  353. return next(null, {
  354. username: user.username,
  355. name: user.name,
  356. avatar: user.avatar
  357. });
  358. }
  359. ],
  360. (err, user) => {
  361. if (!err) usersPerStation[stationId].loggedIn.push(user);
  362. // if user is logged out (an ip can only be counted once)
  363. if (
  364. err === "No session found." &&
  365. !usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)
  366. ) {
  367. usersPerStationCount[stationId] += 1; // increment user count for station
  368. usersPerStation[stationId].loggedOut.push(user);
  369. }
  370. next();
  371. }
  372. );
  373. });
  374. },
  375. () => {
  376. Object.keys(usersPerStationCount).forEach(stationId => {
  377. if (
  378. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&
  379. stationsCountUpdated.indexOf(stationId) === -1
  380. ) {
  381. this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  382. CacheModule.runJob("PUB", {
  383. channel: "station.updateUserCount",
  384. value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }
  385. });
  386. }
  387. });
  388. Object.keys(usersPerStation).forEach(stationId => {
  389. if (
  390. !oldUsersPerStation[stationId] ||
  391. JSON.stringify(oldUsersPerStation[stationId]) !==
  392. JSON.stringify(usersPerStation[stationId]) ||
  393. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]
  394. ) {
  395. this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  396. CacheModule.runJob("PUB", {
  397. channel: "station.updateUsers",
  398. value: { stationId, usersPerStation: usersPerStation[stationId] }
  399. });
  400. }
  401. });
  402. StationsModule.usersPerStationCount = usersPerStationCount;
  403. StationsModule.usersPerStation = usersPerStation;
  404. }
  405. );
  406. resolve();
  407. });
  408. }
  409. }
  410. export default new _TasksModule();