123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522 |
- import async from "async";
- import fs from "fs";
- import path from "path";
- import { fileURLToPath } from "url";
- import CoreClass from "../core";
- import Timer from "../classes/Timer.class";
- const __dirname = path.dirname(fileURLToPath(import.meta.url));
- let TasksModule;
- let CacheModule;
- let StationsModule;
- let UtilsModule;
- let WSModule;
- let DBModule;
- const stationStateWorth = {
- unknown: 0,
- no_song: 1,
- station_paused: 2,
- participate: 3,
- local_paused: 4,
- muted: 5,
- unavailable: 6,
- buffering: 7,
- playing: 8
- };
- class _TasksModule extends CoreClass {
- // eslint-disable-next-line require-jsdoc
- constructor() {
- super("tasks");
- this.tasks = {};
- TasksModule = this;
- }
- /**
- * Initialises the tasks module
- *
- * @returns {Promise} - returns promise (reject, resolve)
- */
- initialize() {
- return new Promise(resolve => {
- CacheModule = this.moduleManager.modules.cache;
- StationsModule = this.moduleManager.modules.stations;
- UtilsModule = this.moduleManager.modules.utils;
- WSModule = this.moduleManager.modules.ws;
- DBModule = this.moduleManager.modules.db;
- TasksModule.runJob("CREATE_TASK", {
- name: "stationSkipTask",
- fn: TasksModule.checkStationSkipTask,
- timeout: 1000 * 60 * 30
- });
- TasksModule.runJob("CREATE_TASK", {
- name: "sessionClearTask",
- fn: TasksModule.sessionClearingTask,
- timeout: 1000 * 60 * 60 * 6
- });
- // TasksModule.runJob("CREATE_TASK", {
- // name: "logFileSizeCheckTask",
- // fn: TasksModule.logFileSizeCheckTask,
- // timeout: 1000 * 60 * 60
- // });
- TasksModule.runJob("CREATE_TASK", {
- name: "collectStationUsersTask",
- fn: TasksModule.collectStationUsersTask,
- timeout: 1000 * 3
- });
- // TasksModule.runJob("CREATE_TASK", {
- // name: "historyClearTask",
- // fn: TasksModule.historyClearTask,
- // timeout: 1000 * 60 * 60 * 6
- // });
- resolve();
- });
- }
- /**
- * Creates a new task
- *
- * @param {object} payload - object that contains the payload
- * @param {string} payload.name - the name of the task
- * @param {string} payload.fn - the function the task will run
- * @param {string} payload.paused - if the task is currently paused
- * @param {boolean} payload.timeout - how often to run the task
- * @returns {Promise} - returns promise (reject, resolve)
- */
- CREATE_TASK(payload) {
- return new Promise((resolve, reject) => {
- TasksModule.tasks[payload.name] = {
- name: payload.name,
- fn: payload.fn,
- timeout: payload.timeout,
- lastRan: 0,
- timer: null
- };
- if (!payload.paused) {
- TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
- .then(() => resolve())
- .catch(err => reject(err));
- } else resolve();
- });
- }
- /**
- * Pauses a task
- *
- * @param {object} payload - object that contains the payload
- * @param {string} payload.taskName - the name of the task to pause
- * @returns {Promise} - returns promise (reject, resolve)
- */
- PAUSE_TASK(payload) {
- const taskName = { payload };
- return new Promise(resolve => {
- if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
- resolve();
- });
- }
- /**
- * Resumes a task
- *
- * @param {object} payload - object that contains the payload
- * @param {string} payload.name - the name of the task to resume
- * @returns {Promise} - returns promise (reject, resolve)
- */
- RESUME_TASK(payload) {
- return new Promise(resolve => {
- TasksModule.tasks[payload.name].timer.resume();
- resolve();
- });
- }
- /**
- * Runs a task's function and restarts the timer
- *
- * @param {object} payload - object that contains the payload
- * @param {string} payload.name - the name of the task to run
- * @returns {Promise} - returns promise (reject, resolve)
- */
- RUN_TASK(payload) {
- return new Promise(resolve => {
- const task = TasksModule.tasks[payload.name];
- if (task.timer) task.timer.pause();
- task.fn.apply(this).then(() => {
- task.lastRan = Date.now();
- task.timer = new Timer(
- () => TasksModule.runJob("RUN_TASK", { name: payload.name }),
- task.timeout,
- false
- );
- resolve();
- });
- });
- }
- /**
- * Periodically checks if any stations need to be skipped
- *
- * @returns {Promise} - returns promise (reject, resolve)
- */
- checkStationSkipTask() {
- return new Promise(resolve => {
- TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
- async.waterfall(
- [
- next => {
- CacheModule.runJob("HGETALL", { table: "stations" })
- .then(response => next(null, response))
- .catch(next);
- },
- (stations, next) => {
- async.each(
- stations,
- (station, next2) => {
- if (station.paused || !station.currentSong || !station.currentSong.title)
- return next2();
- const timeElapsed = Date.now() - station.startedAt - station.timePaused;
- if (timeElapsed <= station.currentSong.duration) return next2();
- TasksModule.log(
- "ERROR",
- "TASK_STATIONS_SKIP_CHECK",
- `Skipping ${station._id} as it should have skipped already.`
- );
- return StationsModule.runJob("INITIALIZE_STATION", {
- stationId: station._id
- }).then(() => next2());
- },
- () => next()
- );
- }
- ],
- () => resolve()
- );
- });
- }
- /**
- * Periodically checks if any sessions are out of date and need to be cleared
- *
- * @returns {Promise} - returns promise (reject, resolve)
- */
- sessionClearingTask() {
- return new Promise(resolve => {
- TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
- async.waterfall(
- [
- next => {
- CacheModule.runJob("HGETALL", { table: "sessions" })
- .then(sessions => next(null, sessions))
- .catch(next);
- },
- (sessions, next) => {
- if (!sessions) return next();
- const keys = Object.keys(sessions);
- return async.each(
- keys,
- (sessionId, next2) => {
- const session = sessions[sessionId];
- if (
- session &&
- session.refreshDate &&
- Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
- )
- return next2();
- if (!session) {
- TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
- return CacheModule.runJob("HDEL", {
- table: "sessions",
- key: sessionId
- }).finally(() => {
- next2();
- });
- }
- if (!session.refreshDate) {
- session.refreshDate = Date.now();
- return CacheModule.runJob("HSET", {
- table: "sessions",
- key: sessionId,
- value: session
- }).finally(() => next2());
- }
- if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
- return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {
- sessionId: session.sessionId
- }).then(sockets => {
- if (sockets.length > 0) {
- session.refreshDate = Date.now();
- CacheModule.runJob("HSET", {
- table: "sessions",
- key: sessionId,
- value: session
- }).finally(() => {
- next2();
- });
- } else {
- TasksModule.log(
- "INFO",
- "TASK_SESSION_CLEAR",
- `Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
- );
- CacheModule.runJob("HDEL", {
- table: "sessions",
- key: session.sessionId
- }).finally(() => next2());
- }
- });
- }
- TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
- return next2();
- },
- () => next()
- );
- }
- ],
- () => resolve()
- );
- });
- }
- /**
- * Periodically warns about the size of any log files
- *
- * @returns {Promise} - returns promise (reject, resolve)
- */
- logFileSizeCheckTask() {
- return new Promise((resolve, reject) => {
- TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
- async.each(
- ["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
- (fileName, next) => {
- try {
- const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
- const mb = stats.size / 1000000;
- if (mb > 25) return next(true);
- return next();
- } catch (err) {
- return next(err);
- }
- },
- async err => {
- if (err && err !== true) {
- err = await UtilsModule.runJob("GET_ERROR", { error: err });
- return reject(new Error(err));
- }
- if (err === true) {
- TasksModule.log(
- "ERROR",
- "LOGGER_FILE_SIZE_WARNING",
- "************************************WARNING*************************************"
- );
- TasksModule.log(
- "ERROR",
- "LOGGER_FILE_SIZE_WARNING",
- "***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
- );
- TasksModule.log(
- "ERROR",
- "LOGGER_FILE_SIZE_WARNING",
- "****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
- );
- TasksModule.log(
- "ERROR",
- "LOGGER_FILE_SIZE_WARNING",
- "********************************************************************************"
- );
- }
- return resolve();
- }
- );
- });
- }
- /**
- * Periodically collect users in stations
- *
- * @returns {Promise} - returns promise (reject, resolve)
- */
- async collectStationUsersTask() {
- const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
- return new Promise(resolve => {
- TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
- const stationsCountUpdated = [];
- const stationsUpdated = [];
- const oldUsersPerStation = StationsModule.usersPerStation;
- const usersPerStation = { loggedIn: [], loggedOut: [] };
- const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
- const usersPerStationCount = {};
- async.each(
- Object.keys(StationsModule.userList),
- (socketId, next) => {
- WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {
- const stationId = StationsModule.userList[socketId];
- const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
- room: `station.${stationId}`
- });
- if (!socket || !room.includes(socketId)) {
- if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
- if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));
- delete StationsModule.userList[socketId];
- return next();
- }
- if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station
- if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };
- return async.waterfall(
- [
- next => {
- if (!socket.session || !socket.session.sessionId) {
- return next("No session found.", { ip: socket.ip });
- }
- return CacheModule.runJob("HGET", {
- table: "sessions",
- key: socket.session.sessionId
- })
- .then(session => next(null, session))
- .catch(next);
- },
- (session, next) => {
- if (!session) return next("Session not found.");
- return userModel.findOne({ _id: session.userId }, next);
- },
- (user, next) => {
- if (!user) return next("User not found.");
- const state = socket.session.stationState ?? "unknown";
- const existingUserObject = usersPerStation[stationId].loggedIn.findLast(
- u => user.username === u.username
- );
- if (existingUserObject) {
- if (stationStateWorth[state] > stationStateWorth[existingUserObject.state]) {
- usersPerStation[stationId].loggedIn[
- usersPerStation[stationId].loggedIn.indexOf(existingUserObject)
- ].state = state;
- }
- return next("User already in the list.");
- }
- usersPerStationCount[stationId] += 1; // increment user count for station
- return next(null, {
- _id: user._id,
- username: user.username,
- name: user.name,
- avatar: user.avatar,
- state
- });
- }
- ],
- (err, user) => {
- if (!err) usersPerStation[stationId].loggedIn.push(user);
- // if user is logged out (an ip can only be counted once)
- if (
- err === "No session found." &&
- !usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)
- ) {
- usersPerStationCount[stationId] += 1; // increment user count for station
- usersPerStation[stationId].loggedOut.push(user);
- }
- next();
- }
- );
- });
- },
- () => {
- Object.keys(usersPerStationCount).forEach(stationId => {
- if (
- oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&
- stationsCountUpdated.indexOf(stationId) === -1
- ) {
- this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
- CacheModule.runJob("PUB", {
- channel: "station.updateUserCount",
- value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }
- });
- }
- });
- Object.keys(usersPerStation).forEach(stationId => {
- if (
- !oldUsersPerStation[stationId] ||
- JSON.stringify(oldUsersPerStation[stationId]) !==
- JSON.stringify(usersPerStation[stationId]) ||
- oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]
- ) {
- this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
- CacheModule.runJob("PUB", {
- channel: "station.updateUsers",
- value: { stationId, usersPerStation: usersPerStation[stationId] }
- });
- }
- });
- StationsModule.usersPerStationCount = usersPerStationCount;
- StationsModule.usersPerStation = usersPerStation;
- }
- );
- resolve();
- });
- }
- /**
- * Periodically removes any old history documents
- *
- * @returns {Promise} - returns promise (reject, resolve)
- */
- async historyClearTask() {
- TasksModule.log("INFO", "TASK_HISTORY_CLEAR", `Removing old history.`);
- const stationHistoryModel = await DBModule.runJob("GET_MODEL", { modelName: "stationHistory" });
- // Remove documents created more than 2 days ago
- const mongoQuery = { "payload.skippedAt": { $lt: new Date(new Date().getTime() - 1000 * 60 * 60 * 24 * 2) } };
- const count = await stationHistoryModel.count(mongoQuery);
- await stationHistoryModel.remove(mongoQuery);
- TasksModule.log("SUCCESS", "TASK_HISTORY_CLEAR", `Removed ${count} history items`);
- }
- }
- export default new _TasksModule();
|