tasks.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. let TasksModule;
  9. let CacheModule;
  10. let StationsModule;
  11. let UtilsModule;
  12. let WSModule;
  13. let DBModule;
  14. const stationStateWorth = {
  15. unknown: 0,
  16. no_song: 1,
  17. station_paused: 2,
  18. participate: 3,
  19. local_paused: 4,
  20. muted: 5,
  21. buffering: 6,
  22. playing: 7
  23. };
  24. class _TasksModule extends CoreClass {
  25. // eslint-disable-next-line require-jsdoc
  26. constructor() {
  27. super("tasks");
  28. this.tasks = {};
  29. TasksModule = this;
  30. }
  31. /**
  32. * Initialises the tasks module
  33. *
  34. * @returns {Promise} - returns promise (reject, resolve)
  35. */
  36. initialize() {
  37. return new Promise(resolve => {
  38. CacheModule = this.moduleManager.modules.cache;
  39. StationsModule = this.moduleManager.modules.stations;
  40. UtilsModule = this.moduleManager.modules.utils;
  41. WSModule = this.moduleManager.modules.ws;
  42. DBModule = this.moduleManager.modules.db;
  43. TasksModule.runJob("CREATE_TASK", {
  44. name: "stationSkipTask",
  45. fn: TasksModule.checkStationSkipTask,
  46. timeout: 1000 * 60 * 30
  47. });
  48. TasksModule.runJob("CREATE_TASK", {
  49. name: "sessionClearTask",
  50. fn: TasksModule.sessionClearingTask,
  51. timeout: 1000 * 60 * 60 * 6
  52. });
  53. // TasksModule.runJob("CREATE_TASK", {
  54. // name: "logFileSizeCheckTask",
  55. // fn: TasksModule.logFileSizeCheckTask,
  56. // timeout: 1000 * 60 * 60
  57. // });
  58. TasksModule.runJob("CREATE_TASK", {
  59. name: "collectStationUsersTask",
  60. fn: TasksModule.collectStationUsersTask,
  61. timeout: 1000 * 3
  62. });
  63. TasksModule.runJob("CREATE_TASK", {
  64. name: "historyClearTask",
  65. fn: TasksModule.historyClearTask,
  66. timeout: 1000 * 60 * 60 * 6
  67. });
  68. resolve();
  69. });
  70. }
  71. /**
  72. * Creates a new task
  73. *
  74. * @param {object} payload - object that contains the payload
  75. * @param {string} payload.name - the name of the task
  76. * @param {string} payload.fn - the function the task will run
  77. * @param {string} payload.paused - if the task is currently paused
  78. * @param {boolean} payload.timeout - how often to run the task
  79. * @returns {Promise} - returns promise (reject, resolve)
  80. */
  81. CREATE_TASK(payload) {
  82. return new Promise((resolve, reject) => {
  83. TasksModule.tasks[payload.name] = {
  84. name: payload.name,
  85. fn: payload.fn,
  86. timeout: payload.timeout,
  87. lastRan: 0,
  88. timer: null
  89. };
  90. if (!payload.paused) {
  91. TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
  92. .then(() => resolve())
  93. .catch(err => reject(err));
  94. } else resolve();
  95. });
  96. }
  97. /**
  98. * Pauses a task
  99. *
  100. * @param {object} payload - object that contains the payload
  101. * @param {string} payload.taskName - the name of the task to pause
  102. * @returns {Promise} - returns promise (reject, resolve)
  103. */
  104. PAUSE_TASK(payload) {
  105. const taskName = { payload };
  106. return new Promise(resolve => {
  107. if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
  108. resolve();
  109. });
  110. }
  111. /**
  112. * Resumes a task
  113. *
  114. * @param {object} payload - object that contains the payload
  115. * @param {string} payload.name - the name of the task to resume
  116. * @returns {Promise} - returns promise (reject, resolve)
  117. */
  118. RESUME_TASK(payload) {
  119. return new Promise(resolve => {
  120. TasksModule.tasks[payload.name].timer.resume();
  121. resolve();
  122. });
  123. }
  124. /**
  125. * Runs a task's function and restarts the timer
  126. *
  127. * @param {object} payload - object that contains the payload
  128. * @param {string} payload.name - the name of the task to run
  129. * @returns {Promise} - returns promise (reject, resolve)
  130. */
  131. RUN_TASK(payload) {
  132. return new Promise(resolve => {
  133. const task = TasksModule.tasks[payload.name];
  134. if (task.timer) task.timer.pause();
  135. task.fn.apply(this).then(() => {
  136. task.lastRan = Date.now();
  137. task.timer = new Timer(
  138. () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
  139. task.timeout,
  140. false
  141. );
  142. resolve();
  143. });
  144. });
  145. }
  146. /**
  147. * Periodically checks if any stations need to be skipped
  148. *
  149. * @returns {Promise} - returns promise (reject, resolve)
  150. */
  151. checkStationSkipTask() {
  152. return new Promise(resolve => {
  153. TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  154. async.waterfall(
  155. [
  156. next => {
  157. CacheModule.runJob("HGETALL", { table: "stations" })
  158. .then(response => next(null, response))
  159. .catch(next);
  160. },
  161. (stations, next) => {
  162. async.each(
  163. stations,
  164. (station, next2) => {
  165. if (station.paused || !station.currentSong || !station.currentSong.title)
  166. return next2();
  167. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  168. if (timeElapsed <= station.currentSong.duration) return next2();
  169. TasksModule.log(
  170. "ERROR",
  171. "TASK_STATIONS_SKIP_CHECK",
  172. `Skipping ${station._id} as it should have skipped already.`
  173. );
  174. return StationsModule.runJob("INITIALIZE_STATION", {
  175. stationId: station._id
  176. }).then(() => next2());
  177. },
  178. () => next()
  179. );
  180. }
  181. ],
  182. () => resolve()
  183. );
  184. });
  185. }
  186. /**
  187. * Periodically checks if any sessions are out of date and need to be cleared
  188. *
  189. * @returns {Promise} - returns promise (reject, resolve)
  190. */
  191. sessionClearingTask() {
  192. return new Promise(resolve => {
  193. TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  194. async.waterfall(
  195. [
  196. next => {
  197. CacheModule.runJob("HGETALL", { table: "sessions" })
  198. .then(sessions => next(null, sessions))
  199. .catch(next);
  200. },
  201. (sessions, next) => {
  202. if (!sessions) return next();
  203. const keys = Object.keys(sessions);
  204. return async.each(
  205. keys,
  206. (sessionId, next2) => {
  207. const session = sessions[sessionId];
  208. if (
  209. session &&
  210. session.refreshDate &&
  211. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  212. )
  213. return next2();
  214. if (!session) {
  215. TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  216. return CacheModule.runJob("HDEL", {
  217. table: "sessions",
  218. key: sessionId
  219. }).finally(() => {
  220. next2();
  221. });
  222. }
  223. if (!session.refreshDate) {
  224. session.refreshDate = Date.now();
  225. return CacheModule.runJob("HSET", {
  226. table: "sessions",
  227. key: sessionId,
  228. value: session
  229. }).finally(() => next2());
  230. }
  231. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  232. return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {
  233. sessionId: session.sessionId
  234. }).then(sockets => {
  235. if (sockets.length > 0) {
  236. session.refreshDate = Date.now();
  237. CacheModule.runJob("HSET", {
  238. table: "sessions",
  239. key: sessionId,
  240. value: session
  241. }).finally(() => {
  242. next2();
  243. });
  244. } else {
  245. TasksModule.log(
  246. "INFO",
  247. "TASK_SESSION_CLEAR",
  248. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  249. );
  250. CacheModule.runJob("HDEL", {
  251. table: "sessions",
  252. key: session.sessionId
  253. }).finally(() => next2());
  254. }
  255. });
  256. }
  257. TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  258. return next2();
  259. },
  260. () => next()
  261. );
  262. }
  263. ],
  264. () => resolve()
  265. );
  266. });
  267. }
  268. /**
  269. * Periodically warns about the size of any log files
  270. *
  271. * @returns {Promise} - returns promise (reject, resolve)
  272. */
  273. logFileSizeCheckTask() {
  274. return new Promise((resolve, reject) => {
  275. TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  276. async.each(
  277. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  278. (fileName, next) => {
  279. try {
  280. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  281. const mb = stats.size / 1000000;
  282. if (mb > 25) return next(true);
  283. return next();
  284. } catch (err) {
  285. return next(err);
  286. }
  287. },
  288. async err => {
  289. if (err && err !== true) {
  290. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  291. return reject(new Error(err));
  292. }
  293. if (err === true) {
  294. TasksModule.log(
  295. "ERROR",
  296. "LOGGER_FILE_SIZE_WARNING",
  297. "************************************WARNING*************************************"
  298. );
  299. TasksModule.log(
  300. "ERROR",
  301. "LOGGER_FILE_SIZE_WARNING",
  302. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  303. );
  304. TasksModule.log(
  305. "ERROR",
  306. "LOGGER_FILE_SIZE_WARNING",
  307. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  308. );
  309. TasksModule.log(
  310. "ERROR",
  311. "LOGGER_FILE_SIZE_WARNING",
  312. "********************************************************************************"
  313. );
  314. }
  315. return resolve();
  316. }
  317. );
  318. });
  319. }
  320. /**
  321. * Periodically collect users in stations
  322. *
  323. * @returns {Promise} - returns promise (reject, resolve)
  324. */
  325. async collectStationUsersTask() {
  326. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  327. return new Promise(resolve => {
  328. TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
  329. const stationsCountUpdated = [];
  330. const stationsUpdated = [];
  331. const oldUsersPerStation = StationsModule.usersPerStation;
  332. const usersPerStation = { loggedIn: [], loggedOut: [] };
  333. const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
  334. const usersPerStationCount = {};
  335. async.each(
  336. Object.keys(StationsModule.userList),
  337. (socketId, next) => {
  338. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {
  339. const stationId = StationsModule.userList[socketId];
  340. const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  341. room: `station.${stationId}`
  342. });
  343. if (!socket || !room.includes(socketId)) {
  344. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  345. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));
  346. delete StationsModule.userList[socketId];
  347. return next();
  348. }
  349. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station
  350. if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };
  351. return async.waterfall(
  352. [
  353. next => {
  354. if (!socket.session || !socket.session.sessionId) {
  355. return next("No session found.", { ip: socket.ip });
  356. }
  357. return CacheModule.runJob("HGET", {
  358. table: "sessions",
  359. key: socket.session.sessionId
  360. })
  361. .then(session => next(null, session))
  362. .catch(next);
  363. },
  364. (session, next) => {
  365. if (!session) return next("Session not found.");
  366. return userModel.findOne({ _id: session.userId }, next);
  367. },
  368. (user, next) => {
  369. if (!user) return next("User not found.");
  370. const state = socket.session.stationState ?? "unknown";
  371. const existingUserObject = usersPerStation[stationId].loggedIn.findLast(
  372. u => user.username === u.username
  373. );
  374. if (existingUserObject) {
  375. if (stationStateWorth[state] > stationStateWorth[existingUserObject.state]) {
  376. usersPerStation[stationId].loggedIn[
  377. usersPerStation[stationId].loggedIn.indexOf(existingUserObject)
  378. ].state = state;
  379. }
  380. return next("User already in the list.");
  381. }
  382. usersPerStationCount[stationId] += 1; // increment user count for station
  383. return next(null, {
  384. _id: user._id,
  385. username: user.username,
  386. name: user.name,
  387. avatar: user.avatar,
  388. state
  389. });
  390. }
  391. ],
  392. (err, user) => {
  393. if (!err) usersPerStation[stationId].loggedIn.push(user);
  394. // if user is logged out (an ip can only be counted once)
  395. if (
  396. err === "No session found." &&
  397. !usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)
  398. ) {
  399. usersPerStationCount[stationId] += 1; // increment user count for station
  400. usersPerStation[stationId].loggedOut.push(user);
  401. }
  402. next();
  403. }
  404. );
  405. });
  406. },
  407. () => {
  408. Object.keys(usersPerStationCount).forEach(stationId => {
  409. if (
  410. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&
  411. stationsCountUpdated.indexOf(stationId) === -1
  412. ) {
  413. this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  414. CacheModule.runJob("PUB", {
  415. channel: "station.updateUserCount",
  416. value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }
  417. });
  418. }
  419. });
  420. Object.keys(usersPerStation).forEach(stationId => {
  421. if (
  422. !oldUsersPerStation[stationId] ||
  423. JSON.stringify(oldUsersPerStation[stationId]) !==
  424. JSON.stringify(usersPerStation[stationId]) ||
  425. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]
  426. ) {
  427. this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  428. CacheModule.runJob("PUB", {
  429. channel: "station.updateUsers",
  430. value: { stationId, usersPerStation: usersPerStation[stationId] }
  431. });
  432. }
  433. });
  434. StationsModule.usersPerStationCount = usersPerStationCount;
  435. StationsModule.usersPerStation = usersPerStation;
  436. }
  437. );
  438. resolve();
  439. });
  440. }
  441. /**
  442. * Periodically removes any old history documents
  443. *
  444. * @returns {Promise} - returns promise (reject, resolve)
  445. */
  446. async historyClearTask() {
  447. TasksModule.log("INFO", "TASK_HISTORY_CLEAR", `Removing old history.`);
  448. const stationHistoryModel = await DBModule.runJob("GET_MODEL", { modelName: "stationHistory" });
  449. // Remove documents created more than 2 days ago
  450. const mongoQuery = { "payload.skippedAt": { $lt: new Date(new Date().getTime() - 1000 * 60 * 60 * 24 * 2) } };
  451. const count = await stationHistoryModel.count(mongoQuery);
  452. await stationHistoryModel.remove(mongoQuery);
  453. TasksModule.log("SUCCESS", "TASK_HISTORY_CLEAR", `Removed ${count} history items`);
  454. }
  455. }
  456. export default new _TasksModule();