tasks.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. import async from "async";
  2. import fs from "fs";
  3. import path from "path";
  4. import { fileURLToPath } from "url";
  5. import CoreClass from "../core";
  6. import Timer from "../classes/Timer.class";
  7. const __dirname = path.dirname(fileURLToPath(import.meta.url));
  8. let TasksModule;
  9. let CacheModule;
  10. let StationsModule;
  11. let UtilsModule;
  12. let WSModule;
  13. let DBModule;
  14. const stationStateWorth = {
  15. unknown: 0,
  16. no_song: 1,
  17. station_paused: 2,
  18. participate: 3,
  19. local_paused: 4,
  20. muted: 5,
  21. unavailable: 6,
  22. buffering: 7,
  23. playing: 8
  24. };
  25. class _TasksModule extends CoreClass {
  26. // eslint-disable-next-line require-jsdoc
  27. constructor() {
  28. super("tasks");
  29. this.tasks = {};
  30. TasksModule = this;
  31. }
  32. /**
  33. * Initialises the tasks module
  34. * @returns {Promise} - returns promise (reject, resolve)
  35. */
  36. initialize() {
  37. return new Promise(resolve => {
  38. CacheModule = this.moduleManager.modules.cache;
  39. StationsModule = this.moduleManager.modules.stations;
  40. UtilsModule = this.moduleManager.modules.utils;
  41. WSModule = this.moduleManager.modules.ws;
  42. DBModule = this.moduleManager.modules.db;
  43. TasksModule.runJob("CREATE_TASK", {
  44. name: "stationSkipTask",
  45. fn: TasksModule.checkStationSkipTask,
  46. timeout: 1000 * 60 * 30
  47. });
  48. TasksModule.runJob("CREATE_TASK", {
  49. name: "sessionClearTask",
  50. fn: TasksModule.sessionClearingTask,
  51. timeout: 1000 * 60 * 60 * 6
  52. });
  53. // TasksModule.runJob("CREATE_TASK", {
  54. // name: "logFileSizeCheckTask",
  55. // fn: TasksModule.logFileSizeCheckTask,
  56. // timeout: 1000 * 60 * 60
  57. // });
  58. TasksModule.runJob("CREATE_TASK", {
  59. name: "collectStationUsersTask",
  60. fn: TasksModule.collectStationUsersTask,
  61. timeout: 1000 * 3
  62. });
  63. // TasksModule.runJob("CREATE_TASK", {
  64. // name: "historyClearTask",
  65. // fn: TasksModule.historyClearTask,
  66. // timeout: 1000 * 60 * 60 * 6
  67. // });
  68. resolve();
  69. });
  70. }
  71. /**
  72. * Creates a new task
  73. * @param {object} payload - object that contains the payload
  74. * @param {string} payload.name - the name of the task
  75. * @param {string} payload.fn - the function the task will run
  76. * @param {string} payload.paused - if the task is currently paused
  77. * @param {boolean} payload.timeout - how often to run the task
  78. * @returns {Promise} - returns promise (reject, resolve)
  79. */
  80. CREATE_TASK(payload) {
  81. return new Promise((resolve, reject) => {
  82. TasksModule.tasks[payload.name] = {
  83. name: payload.name,
  84. fn: payload.fn,
  85. timeout: payload.timeout,
  86. lastRan: 0,
  87. timer: null
  88. };
  89. if (!payload.paused) {
  90. TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
  91. .then(() => resolve())
  92. .catch(err => reject(err));
  93. } else resolve();
  94. });
  95. }
  96. /**
  97. * Pauses a task
  98. * @param {object} payload - object that contains the payload
  99. * @param {string} payload.taskName - the name of the task to pause
  100. * @returns {Promise} - returns promise (reject, resolve)
  101. */
  102. PAUSE_TASK(payload) {
  103. const taskName = { payload };
  104. return new Promise(resolve => {
  105. if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
  106. resolve();
  107. });
  108. }
  109. /**
  110. * Resumes a task
  111. * @param {object} payload - object that contains the payload
  112. * @param {string} payload.name - the name of the task to resume
  113. * @returns {Promise} - returns promise (reject, resolve)
  114. */
  115. RESUME_TASK(payload) {
  116. return new Promise(resolve => {
  117. TasksModule.tasks[payload.name].timer.resume();
  118. resolve();
  119. });
  120. }
  121. /**
  122. * Runs a task's function and restarts the timer
  123. * @param {object} payload - object that contains the payload
  124. * @param {string} payload.name - the name of the task to run
  125. * @returns {Promise} - returns promise (reject, resolve)
  126. */
  127. RUN_TASK(payload) {
  128. return new Promise(resolve => {
  129. const task = TasksModule.tasks[payload.name];
  130. if (task.timer) task.timer.pause();
  131. task.fn.apply(this).then(() => {
  132. task.lastRan = Date.now();
  133. task.timer = new Timer(
  134. () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
  135. task.timeout,
  136. false
  137. );
  138. resolve();
  139. });
  140. });
  141. }
  142. /**
  143. * Periodically checks if any stations need to be skipped
  144. * @returns {Promise} - returns promise (reject, resolve)
  145. */
  146. checkStationSkipTask() {
  147. return new Promise(resolve => {
  148. TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
  149. async.waterfall(
  150. [
  151. next => {
  152. CacheModule.runJob("HGETALL", { table: "stations" })
  153. .then(response => next(null, response))
  154. .catch(next);
  155. },
  156. (stations, next) => {
  157. async.each(
  158. stations,
  159. (station, next2) => {
  160. if (station.paused || !station.currentSong || !station.currentSong.title)
  161. return next2();
  162. const timeElapsed = Date.now() - station.startedAt - station.timePaused;
  163. if (timeElapsed <= station.currentSong.duration) return next2();
  164. TasksModule.log(
  165. "ERROR",
  166. "TASK_STATIONS_SKIP_CHECK",
  167. `Skipping ${station._id} as it should have skipped already.`
  168. );
  169. return StationsModule.runJob("INITIALIZE_STATION", {
  170. stationId: station._id
  171. }).then(() => next2());
  172. },
  173. () => next()
  174. );
  175. }
  176. ],
  177. () => resolve()
  178. );
  179. });
  180. }
  181. /**
  182. * Periodically checks if any sessions are out of date and need to be cleared
  183. * @returns {Promise} - returns promise (reject, resolve)
  184. */
  185. sessionClearingTask() {
  186. return new Promise(resolve => {
  187. TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
  188. async.waterfall(
  189. [
  190. next => {
  191. CacheModule.runJob("HGETALL", { table: "sessions" })
  192. .then(sessions => next(null, sessions))
  193. .catch(next);
  194. },
  195. (sessions, next) => {
  196. if (!sessions) return next();
  197. const keys = Object.keys(sessions);
  198. return async.each(
  199. keys,
  200. (sessionId, next2) => {
  201. const session = sessions[sessionId];
  202. if (
  203. session &&
  204. session.refreshDate &&
  205. Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
  206. )
  207. return next2();
  208. if (!session) {
  209. TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
  210. return CacheModule.runJob("HDEL", {
  211. table: "sessions",
  212. key: sessionId
  213. }).finally(() => {
  214. next2();
  215. });
  216. }
  217. if (!session.refreshDate) {
  218. session.refreshDate = Date.now();
  219. return CacheModule.runJob("HSET", {
  220. table: "sessions",
  221. key: sessionId,
  222. value: session
  223. }).finally(() => next2());
  224. }
  225. if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
  226. return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {
  227. sessionId: session.sessionId
  228. }).then(sockets => {
  229. if (sockets.length > 0) {
  230. session.refreshDate = Date.now();
  231. CacheModule.runJob("HSET", {
  232. table: "sessions",
  233. key: sessionId,
  234. value: session
  235. }).finally(() => {
  236. next2();
  237. });
  238. } else {
  239. TasksModule.log(
  240. "INFO",
  241. "TASK_SESSION_CLEAR",
  242. `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
  243. );
  244. CacheModule.runJob("HDEL", {
  245. table: "sessions",
  246. key: session.sessionId
  247. }).finally(() => next2());
  248. }
  249. });
  250. }
  251. TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
  252. return next2();
  253. },
  254. () => next()
  255. );
  256. }
  257. ],
  258. () => resolve()
  259. );
  260. });
  261. }
  262. /**
  263. * Periodically warns about the size of any log files
  264. * @returns {Promise} - returns promise (reject, resolve)
  265. */
  266. logFileSizeCheckTask() {
  267. return new Promise((resolve, reject) => {
  268. TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
  269. async.each(
  270. ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
  271. (fileName, next) => {
  272. try {
  273. const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
  274. const mb = stats.size / 1000000;
  275. if (mb > 25) return next(true);
  276. return next();
  277. } catch (err) {
  278. return next(err);
  279. }
  280. },
  281. async err => {
  282. if (err && err !== true) {
  283. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  284. return reject(new Error(err));
  285. }
  286. if (err === true) {
  287. TasksModule.log(
  288. "ERROR",
  289. "LOGGER_FILE_SIZE_WARNING",
  290. "************************************WARNING*************************************"
  291. );
  292. TasksModule.log(
  293. "ERROR",
  294. "LOGGER_FILE_SIZE_WARNING",
  295. "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
  296. );
  297. TasksModule.log(
  298. "ERROR",
  299. "LOGGER_FILE_SIZE_WARNING",
  300. "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
  301. );
  302. TasksModule.log(
  303. "ERROR",
  304. "LOGGER_FILE_SIZE_WARNING",
  305. "********************************************************************************"
  306. );
  307. }
  308. return resolve();
  309. }
  310. );
  311. });
  312. }
  313. /**
  314. * Periodically collect users in stations
  315. * @returns {Promise} - returns promise (reject, resolve)
  316. */
  317. async collectStationUsersTask() {
  318. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  319. return new Promise(resolve => {
  320. TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
  321. const stationsCountUpdated = [];
  322. const stationsUpdated = [];
  323. const oldUsersPerStation = StationsModule.usersPerStation;
  324. const usersPerStation = { loggedIn: [], loggedOut: [] };
  325. const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
  326. const usersPerStationCount = {};
  327. async.each(
  328. Object.keys(StationsModule.userList),
  329. (socketId, next) => {
  330. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {
  331. const stationId = StationsModule.userList[socketId];
  332. const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  333. room: `station.${stationId}`
  334. });
  335. if (!socket || !room.includes(socketId)) {
  336. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  337. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));
  338. delete StationsModule.userList[socketId];
  339. return next();
  340. }
  341. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station
  342. if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };
  343. return async.waterfall(
  344. [
  345. next => {
  346. if (!socket.session || !socket.session.sessionId) {
  347. return next("No session found.", { ip: socket.ip });
  348. }
  349. return CacheModule.runJob("HGET", {
  350. table: "sessions",
  351. key: socket.session.sessionId
  352. })
  353. .then(session => next(null, session))
  354. .catch(next);
  355. },
  356. (session, next) => {
  357. if (!session) return next("Session not found.");
  358. return userModel.findOne({ _id: session.userId }, next);
  359. },
  360. (user, next) => {
  361. if (!user) return next("User not found.");
  362. const state = socket.session.stationState ?? "unknown";
  363. const existingUserObject = usersPerStation[stationId].loggedIn.findLast(
  364. u => user.username === u.username
  365. );
  366. if (existingUserObject) {
  367. if (stationStateWorth[state] > stationStateWorth[existingUserObject.state]) {
  368. usersPerStation[stationId].loggedIn[
  369. usersPerStation[stationId].loggedIn.indexOf(existingUserObject)
  370. ].state = state;
  371. }
  372. return next("User already in the list.");
  373. }
  374. usersPerStationCount[stationId] += 1; // increment user count for station
  375. return next(null, {
  376. _id: user._id,
  377. username: user.username,
  378. name: user.name,
  379. avatar: user.avatar,
  380. state
  381. });
  382. }
  383. ],
  384. (err, user) => {
  385. if (!err) usersPerStation[stationId].loggedIn.push(user);
  386. // if user is logged out (an ip can only be counted once)
  387. if (
  388. err === "No session found." &&
  389. !usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)
  390. ) {
  391. usersPerStationCount[stationId] += 1; // increment user count for station
  392. usersPerStation[stationId].loggedOut.push(user);
  393. }
  394. next();
  395. }
  396. );
  397. });
  398. },
  399. () => {
  400. Object.keys(usersPerStationCount).forEach(stationId => {
  401. if (
  402. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&
  403. stationsCountUpdated.indexOf(stationId) === -1
  404. ) {
  405. this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  406. CacheModule.runJob("PUB", {
  407. channel: "station.updateUserCount",
  408. value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }
  409. });
  410. }
  411. });
  412. Object.keys(usersPerStation).forEach(stationId => {
  413. if (
  414. !oldUsersPerStation[stationId] ||
  415. JSON.stringify(oldUsersPerStation[stationId]) !==
  416. JSON.stringify(usersPerStation[stationId]) ||
  417. oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]
  418. ) {
  419. this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  420. CacheModule.runJob("PUB", {
  421. channel: "station.updateUsers",
  422. value: { stationId, usersPerStation: usersPerStation[stationId] }
  423. });
  424. }
  425. });
  426. StationsModule.usersPerStationCount = usersPerStationCount;
  427. StationsModule.usersPerStation = usersPerStation;
  428. }
  429. );
  430. resolve();
  431. });
  432. }
  433. /**
  434. * Periodically removes any old history documents
  435. * @returns {Promise} - returns promise (reject, resolve)
  436. */
  437. async historyClearTask() {
  438. TasksModule.log("INFO", "TASK_HISTORY_CLEAR", `Removing old history.`);
  439. const stationHistoryModel = await DBModule.runJob("GET_MODEL", { modelName: "stationHistory" });
  440. // Remove documents created more than 2 days ago
  441. const mongoQuery = { "payload.skippedAt": { $lt: new Date(new Date().getTime() - 1000 * 60 * 60 * 24 * 2) } };
  442. const count = await stationHistoryModel.count(mongoQuery);
  443. await stationHistoryModel.remove(mongoQuery);
  444. TasksModule.log("SUCCESS", "TASK_HISTORY_CLEAR", `Removed ${count} history items`);
  445. }
  446. }
  447. export default new _TasksModule();