tasks.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. let TasksModule;
  9. let CacheModule;
  10. let StationsModule;
  11. let UtilsModule;
  12. let WSModule;
  13. let DBModule;
  14. const stationStateWorth = {
  15. unknown: 0,
  16. no_song: 1,
  17. station_paused: 2,
  18. participate: 3,
  19. local_paused: 4,
  20. muted: 5,
  21. unavailable: 6,
  22. buffering: 7,
  23. playing: 8
  24. };
  25. class _TasksModule extends CoreClass {
  26. // eslint-disable-next-line require-jsdoc
  27. constructor() {
  28. super("tasks");
  29. this.tasks = {};
  30. TasksModule = this;
  31. }
  32. /**
  33. * Initialises the tasks module
  34. *
  35. * @returns {Promise} - returns promise (reject, resolve)
  36. */
  37. initialize() {
  38. return new Promise(resolve => {
  39. CacheModule = this.moduleManager.modules.cache;
  40. StationsModule = this.moduleManager.modules.stations;
  41. UtilsModule = this.moduleManager.modules.utils;
  42. WSModule = this.moduleManager.modules.ws;
  43. DBModule = this.moduleManager.modules.db;
  44. TasksModule.runJob("CREATE_TASK", {
  45. name: "stationSkipTask",
  46. fn: TasksModule.checkStationSkipTask,
  47. timeout: 1000 * 60 * 30
  48. });
  49. TasksModule.runJob("CREATE_TASK", {
  50. name: "sessionClearTask",
  51. fn: TasksModule.sessionClearingTask,
  52. timeout: 1000 * 60 * 60 * 6
  53. });
  54. // TasksModule.runJob("CREATE_TASK", {
  55. // name: "logFileSizeCheckTask",
  56. // fn: TasksModule.logFileSizeCheckTask,
  57. // timeout: 1000 * 60 * 60
  58. // });
  59. TasksModule.runJob("CREATE_TASK", {
  60. name: "collectStationUsersTask",
  61. fn: TasksModule.collectStationUsersTask,
  62. timeout: 1000 * 3
  63. });
  64. // TasksModule.runJob("CREATE_TASK", {
  65. // name: "historyClearTask",
  66. // fn: TasksModule.historyClearTask,
  67. // timeout: 1000 * 60 * 60 * 6
  68. // });
  69. resolve();
  70. });
  71. }
  72. /**
  73. * Creates a new task
  74. *
  75. * @param {object} payload - object that contains the payload
  76. * @param {string} payload.name - the name of the task
  77. * @param {string} payload.fn - the function the task will run
  78. * @param {string} payload.paused - if the task is currently paused
  79. * @param {boolean} payload.timeout - how often to run the task
  80. * @returns {Promise} - returns promise (reject, resolve)
  81. */
  82. CREATE_TASK(payload) {
  83. return new Promise((resolve, reject) => {
  84. TasksModule.tasks[payload.name] = {
  85. name: payload.name,
  86. fn: payload.fn,
  87. timeout: payload.timeout,
  88. lastRan: 0,
  89. timer: null
  90. };
  91. if (!payload.paused) {
  92. TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
  93. .then(() => resolve())
  94. .catch(err => reject(err));
  95. } else resolve();
  96. });
  97. }
  98. /**
  99. * Pauses a task
  100. *
  101. * @param {object} payload - object that contains the payload
  102. * @param {string} payload.taskName - the name of the task to pause
  103. * @returns {Promise} - returns promise (reject, resolve)
  104. */
  105. PAUSE_TASK(payload) {
  106. const taskName = { payload };
  107. return new Promise(resolve => {
  108. if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
  109. resolve();
  110. });
  111. }
  112. /**
  113. * Resumes a task
  114. *
  115. * @param {object} payload - object that contains the payload
  116. * @param {string} payload.name - the name of the task to resume
  117. * @returns {Promise} - returns promise (reject, resolve)
  118. */
  119. RESUME_TASK(payload) {
  120. return new Promise(resolve => {
  121. TasksModule.tasks[payload.name].timer.resume();
  122. resolve();
  123. });
  124. }
  125. /**
  126. * Runs a task's function and restarts the timer
  127. *
  128. * @param {object} payload - object that contains the payload
  129. * @param {string} payload.name - the name of the task to run
  130. * @returns {Promise} - returns promise (reject, resolve)
  131. */
  132. RUN_TASK(payload) {
  133. return new Promise(resolve => {
  134. const task = TasksModule.tasks[payload.name];
  135. if (task.timer) task.timer.pause();
  136. task.fn.apply(this).then(() => {
  137. task.lastRan = Date.now();
  138. task.timer = new Timer(
  139. () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
  140. task.timeout,
  141. false
  142. );
  143. resolve();
  144. });
  145. });
  146. }
  147. /**
  148. * Periodically checks if any stations need to be skipped
  149. *
  150. * @returns {Promise} - returns promise (reject, resolve)
  151. */
  152. checkStationSkipTask() {
  153. return new Promise(resolve => {
  154. TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  155. async.waterfall(
  156. [
  157. next => {
  158. CacheModule.runJob("HGETALL", { table: "stations" })
  159. .then(response => next(null, response))
  160. .catch(next);
  161. },
  162. (stations, next) => {
  163. async.each(
  164. stations,
  165. (station, next2) => {
  166. if (station.paused || !station.currentSong || !station.currentSong.title)
  167. return next2();
  168. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  169. if (timeElapsed <= station.currentSong.duration) return next2();
  170. TasksModule.log(
  171. "ERROR",
  172. "TASK_STATIONS_SKIP_CHECK",
  173. `Skipping ${station._id} as it should have skipped already.`
  174. );
  175. return StationsModule.runJob("INITIALIZE_STATION", {
  176. stationId: station._id
  177. }).then(() => next2());
  178. },
  179. () => next()
  180. );
  181. }
  182. ],
  183. () => resolve()
  184. );
  185. });
  186. }
  187. /**
  188. * Periodically checks if any sessions are out of date and need to be cleared
  189. *
  190. * @returns {Promise} - returns promise (reject, resolve)
  191. */
  192. sessionClearingTask() {
  193. return new Promise(resolve => {
  194. TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  195. async.waterfall(
  196. [
  197. next => {
  198. CacheModule.runJob("HGETALL", { table: "sessions" })
  199. .then(sessions => next(null, sessions))
  200. .catch(next);
  201. },
  202. (sessions, next) => {
  203. if (!sessions) return next();
  204. const keys = Object.keys(sessions);
  205. return async.each(
  206. keys,
  207. (sessionId, next2) => {
  208. const session = sessions[sessionId];
  209. if (
  210. session &&
  211. session.refreshDate &&
  212. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  213. )
  214. return next2();
  215. if (!session) {
  216. TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  217. return CacheModule.runJob("HDEL", {
  218. table: "sessions",
  219. key: sessionId
  220. }).finally(() => {
  221. next2();
  222. });
  223. }
  224. if (!session.refreshDate) {
  225. session.refreshDate = Date.now();
  226. return CacheModule.runJob("HSET", {
  227. table: "sessions",
  228. key: sessionId,
  229. value: session
  230. }).finally(() => next2());
  231. }
  232. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  233. return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {
  234. sessionId: session.sessionId
  235. }).then(sockets => {
  236. if (sockets.length > 0) {
  237. session.refreshDate = Date.now();
  238. CacheModule.runJob("HSET", {
  239. table: "sessions",
  240. key: sessionId,
  241. value: session
  242. }).finally(() => {
  243. next2();
  244. });
  245. } else {
  246. TasksModule.log(
  247. "INFO",
  248. "TASK_SESSION_CLEAR",
  249. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  250. );
  251. CacheModule.runJob("HDEL", {
  252. table: "sessions",
  253. key: session.sessionId
  254. }).finally(() => next2());
  255. }
  256. });
  257. }
  258. TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  259. return next2();
  260. },
  261. () => next()
  262. );
  263. }
  264. ],
  265. () => resolve()
  266. );
  267. });
  268. }
  269. /**
  270. * Periodically warns about the size of any log files
  271. *
  272. * @returns {Promise} - returns promise (reject, resolve)
  273. */
  274. logFileSizeCheckTask() {
  275. return new Promise((resolve, reject) => {
  276. TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  277. async.each(
  278. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  279. (fileName, next) => {
  280. try {
  281. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  282. const mb = stats.size / 1000000;
  283. if (mb > 25) return next(true);
  284. return next();
  285. } catch (err) {
  286. return next(err);
  287. }
  288. },
  289. async err => {
  290. if (err && err !== true) {
  291. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  292. return reject(new Error(err));
  293. }
  294. if (err === true) {
  295. TasksModule.log(
  296. "ERROR",
  297. "LOGGER_FILE_SIZE_WARNING",
  298. "************************************WARNING*************************************"
  299. );
  300. TasksModule.log(
  301. "ERROR",
  302. "LOGGER_FILE_SIZE_WARNING",
  303. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  304. );
  305. TasksModule.log(
  306. "ERROR",
  307. "LOGGER_FILE_SIZE_WARNING",
  308. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  309. );
  310. TasksModule.log(
  311. "ERROR",
  312. "LOGGER_FILE_SIZE_WARNING",
  313. "********************************************************************************"
  314. );
  315. }
  316. return resolve();
  317. }
  318. );
  319. });
  320. }
  321. /**
  322. * Periodically collect users in stations
  323. *
  324. * @returns {Promise} - returns promise (reject, resolve)
  325. */
  326. async collectStationUsersTask() {
  327. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  328. return new Promise(resolve => {
  329. TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
  330. const stationsCountUpdated = [];
  331. const stationsUpdated = [];
  332. const oldUsersPerStation = StationsModule.usersPerStation;
  333. const usersPerStation = { loggedIn: [], loggedOut: [] };
  334. const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
  335. const usersPerStationCount = {};
  336. async.each(
  337. Object.keys(StationsModule.userList),
  338. (socketId, next) => {
  339. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {
  340. const stationId = StationsModule.userList[socketId];
  341. const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  342. room: `station.${stationId}`
  343. });
  344. if (!socket || !room.includes(socketId)) {
  345. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  346. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));
  347. delete StationsModule.userList[socketId];
  348. return next();
  349. }
  350. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station
  351. if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };
  352. return async.waterfall(
  353. [
  354. next => {
  355. if (!socket.session || !socket.session.sessionId) {
  356. return next("No session found.", { ip: socket.ip });
  357. }
  358. return CacheModule.runJob("HGET", {
  359. table: "sessions",
  360. key: socket.session.sessionId
  361. })
  362. .then(session => next(null, session))
  363. .catch(next);
  364. },
  365. (session, next) => {
  366. if (!session) return next("Session not found.");
  367. return userModel.findOne({ _id: session.userId }, next);
  368. },
  369. (user, next) => {
  370. if (!user) return next("User not found.");
  371. const state = socket.session.stationState ?? "unknown";
  372. const existingUserObject = usersPerStation[stationId].loggedIn.findLast(
  373. u => user.username === u.username
  374. );
  375. if (existingUserObject) {
  376. if (stationStateWorth[state] > stationStateWorth[existingUserObject.state]) {
  377. usersPerStation[stationId].loggedIn[
  378. usersPerStation[stationId].loggedIn.indexOf(existingUserObject)
  379. ].state = state;
  380. }
  381. return next("User already in the list.");
  382. }
  383. usersPerStationCount[stationId] += 1; // increment user count for station
  384. return next(null, {
  385. _id: user._id,
  386. username: user.username,
  387. name: user.name,
  388. avatar: user.avatar,
  389. state
  390. });
  391. }
  392. ],
  393. (err, user) => {
  394. if (!err) usersPerStation[stationId].loggedIn.push(user);
  395. // if user is logged out (an ip can only be counted once)
  396. if (
  397. err === "No session found." &&
  398. !usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)
  399. ) {
  400. usersPerStationCount[stationId] += 1; // increment user count for station
  401. usersPerStation[stationId].loggedOut.push(user);
  402. }
  403. next();
  404. }
  405. );
  406. });
  407. },
  408. () => {
  409. Object.keys(usersPerStationCount).forEach(stationId => {
  410. if (
  411. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&
  412. stationsCountUpdated.indexOf(stationId) === -1
  413. ) {
  414. this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  415. CacheModule.runJob("PUB", {
  416. channel: "station.updateUserCount",
  417. value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }
  418. });
  419. }
  420. });
  421. Object.keys(usersPerStation).forEach(stationId => {
  422. if (
  423. !oldUsersPerStation[stationId] ||
  424. JSON.stringify(oldUsersPerStation[stationId]) !==
  425. JSON.stringify(usersPerStation[stationId]) ||
  426. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]
  427. ) {
  428. this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  429. CacheModule.runJob("PUB", {
  430. channel: "station.updateUsers",
  431. value: { stationId, usersPerStation: usersPerStation[stationId] }
  432. });
  433. }
  434. });
  435. StationsModule.usersPerStationCount = usersPerStationCount;
  436. StationsModule.usersPerStation = usersPerStation;
  437. }
  438. );
  439. resolve();
  440. });
  441. }
  442. /**
  443. * Periodically removes any old history documents
  444. *
  445. * @returns {Promise} - returns promise (reject, resolve)
  446. */
  447. async historyClearTask() {
  448. TasksModule.log("INFO", "TASK_HISTORY_CLEAR", `Removing old history.`);
  449. const stationHistoryModel = await DBModule.runJob("GET_MODEL", { modelName: "stationHistory" });
  450. // Remove documents created more than 2 days ago
  451. const mongoQuery = { "payload.skippedAt": { $lt: new Date(new Date().getTime() - 1000 * 60 * 60 * 24 * 2) } };
  452. const count = await stationHistoryModel.count(mongoQuery);
  453. await stationHistoryModel.remove(mongoQuery);
  454. TasksModule.log("SUCCESS", "TASK_HISTORY_CLEAR", `Removed ${count} history items`);
  455. }
  456. }
  457. export default new _TasksModule();