/** * @file */ import config from "config"; import async from "async"; import WebSocket from "ws"; import { EventEmitter } from "events"; import CoreClass from "../core"; let IOModule; let AppModule; let CacheModule; let UtilsModule; let DBModule; let PunishmentsModule; class _IOModule extends CoreClass { // eslint-disable-next-line require-jsdoc constructor() { super("io"); IOModule = this; } /** * Initialises the io module * * @returns {Promise} - returns promise (reject, resolve) */ async initialize() { this.setStage(1); AppModule = this.moduleManager.modules.app; CacheModule = this.moduleManager.modules.cache; UtilsModule = this.moduleManager.modules.utils; DBModule = this.moduleManager.modules.db; PunishmentsModule = this.moduleManager.modules.punishments; this.actions = (await import("./actions")).default; this.userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }); this.setStage(2); this.SIDname = config.get("cookie.SIDname"); // TODO: Check every 30s/, for all sockets, if they are still allowed to be in the rooms they are in, and on socket at all (permission changing/banning) const server = await AppModule.runJob("SERVER"); // this._io.origins(config.get("cors.origin")); this._io = new WebSocket.Server({ server, path: "/ws" }); this.rooms = {}; return new Promise(resolve => { this.setStage(3); this.setStage(4); this._io.on("connection", async (socket, req) => { socket.dispatch = (...args) => socket.send(JSON.stringify(args)); console.log(socket.actions); socket.actions = new EventEmitter(); socket.actions.setMaxListeners(0); socket.listen = (target, cb) => socket.actions.addListener(target, args => cb(args)); IOModule.runJob("HANDLE_IO_USE", { socket, req }).then(socket => IOModule.runJob("HANDLE_IO_CONNECTION", { socket }) ); }); this.setStage(5); return resolve(); }); } /** * Returns the socket io variable * * @returns {Promise} - returns a promise (resolve, reject) */ IO() { return new Promise(resolve => { resolve(IOModule._io); }); } /** * Obtains socket object for a specified socket id * * @param {object} payload - object containing the payload * @param {string} payload.socketId - the id of the socket * @returns {Promise} - returns promise (reject, resolve) */ async SOCKET_FROM_SOCKET_ID(payload) { return new Promise((resolve, reject) => { const { clients } = IOModule._io; if (clients) { return clients.forEach(socket => { if (socket.session.socketId === payload.socketId) resolve(socket); }); } return reject(); }); } /** * Gets all sockets for a specified session id * * @param {object} payload - object containing the payload * @param {string} payload.sessionId - user session id * @returns {Promise} - returns promise (reject, resolve) */ async SOCKETS_FROM_SESSION_ID(payload) { return new Promise(resolve => { const { clients } = IOModule._io; const sockets = []; if (clients) { return async.each( Object.keys(clients), (id, next) => { const { session } = clients[id]; if (session.sessionId === payload.sessionId) sockets.push(session.sessionId); next(); }, () => resolve(sockets) ); } return resolve(); }); } /** * Returns any sockets for a specific user * * @param {object} payload - object that contains the payload * @param {string} payload.userId - the user id * @returns {Promise} - returns promise (reject, resolve) */ async SOCKETS_FROM_USER(payload) { return new Promise((resolve, reject) => { const sockets = []; return async.eachLimit( Object.keys(IOModule._io.clients), 1, (id, next) => { const { session } = IOModule._io.clients[id]; console.log(1, session); if (session.sessionId) { return CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }, this) .then(session => { console.log(2, session, payload.userId); if (session && session.userId === payload.userId) sockets.push(IOModule._io.clients[id]); next(); }) .catch(err => next(err)); } return next(); }, err => { console.log("SOCKETS_FROM_USER", sockets, err); if (err) return reject(err); return resolve(sockets); } ); }); } /** * Returns any sockets from a specific ip address * * @param {object} payload - object that contains the payload * @param {string} payload.ip - the ip address in question * @returns {Promise} - returns promise (reject, resolve) */ async SOCKETS_FROM_IP(payload) { return new Promise(resolve => { const { clients } = IOModule._io; const sockets = []; return async.each( Object.keys(clients), (id, next) => { const { session } = clients[id]; CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }, this) .then(session => { if (session && clients[id].ip === payload.ip) sockets.push(clients[id]); next(); }) .catch(() => next()); }, () => resolve(sockets) ); }); } /** * Returns any sockets from a specific user without using redis/cache * * @param {object} payload - object that contains the payload * @param {string} payload.userId - the id of the user in question * @returns {Promise} - returns promise (reject, resolve) */ async SOCKETS_FROM_USER_WITHOUT_CACHE(payload) { return new Promise(resolve => { const { clients } = IOModule._io; const sockets = []; if (clients) { return async.each( Object.keys(clients), (id, next) => { const { session } = clients[id]; if (session.userId === payload.userId) sockets.push(clients[id]); next(); }, () => resolve(sockets) ); } return resolve(); }); } /** * Allows a socket to leave any rooms they are connected to * * @param {object} payload - object that contains the payload * @param {string} payload.socketId - the id of the socket which should leave all their rooms * @returns {Promise} - returns promise (reject, resolve) */ async SOCKET_LEAVE_ROOMS(payload) { return new Promise(resolve => { // filter out rooms that the user is in Object.keys(IOModule.rooms).forEach(room => { IOModule.rooms[room] = IOModule.rooms[room].filter(participant => participant !== payload.socketId); }); return resolve(); }); } /** * Allows a socket to join a specified room (this will remove them from any rooms they are currently in) * * @param {object} payload - object that contains the payload * @param {string} payload.socketId - the id of the socket which should join the room * @param {object} payload.room - the object representing the room the socket should join * @returns {Promise} - returns promise (reject, resolve) */ async SOCKET_JOIN_ROOM(payload) { const { room, socketId } = payload; console.log("JOIN_ROOM", payload.room); // leave all other rooms await IOModule.runJob("SOCKET_LEAVE_ROOMS", { socketId }, this); return new Promise(resolve => { // create room if it doesn't exist, and add socketId to array if (IOModule.rooms[room]) IOModule.rooms[room].push(socketId); else IOModule.rooms[room] = [socketId]; return resolve(); }); } /** * Emits arguments to any sockets that are in a specified a room * * @param {object} payload - object that contains the payload * @param {string} payload.room - the name of the room to emit arguments * @param {object} payload.args - any arguments to be emitted to the sockets in the specific room * @returns {Promise} - returns promise (reject, resolve) */ async EMIT_TO_ROOM(payload) { console.log("EMIT_TO_ROOM", payload.room); return new Promise(resolve => { // IOModule._io.clients.forEach(socket => { // console.log(1234); // console.log(socket, socket.rooms); // if (socket.rooms[payload.room]) { // console.log(payload.args); // socket.dispatch(...payload.args); // } // }); console.log(IOModule.rooms, payload.room); if (IOModule.rooms[payload.room]) return IOModule.rooms[payload.room].forEach(async socketId => { const socket = await IOModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this); socket.dispatch(...payload.args); }); return resolve(); }); } // UNKNOWN // eslint-disable-next-line require-jsdoc async SOCKET_JOIN_SONG_ROOM(payload) { // socketId, room const socket = await IOModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId: payload.socketId }, this); return new Promise(resolve => { const { rooms } = socket; Object.keys(rooms).forEach(roomKey => { const room = rooms[roomKey]; if (room.indexOf("song.") !== -1) socket.leave(room); }); socket.join(payload.room); return resolve(); }); } // UNKNOWN // eslint-disable-next-line require-jsdoc SOCKETS_JOIN_SONG_ROOM(payload) { // sockets, room return new Promise(resolve => { Object.keys(payload.sockets).forEach(socketKey => { const socket = payload.sockets[socketKey]; const { rooms } = socket; Object.keys(rooms).forEach(roomKey => { const room = rooms[roomKey]; if (room.indexOf("song.") !== -1) socket.leave(room); }); socket.join(payload.room); }); return resolve(); }); } // UNKNOWN // eslint-disable-next-line require-jsdoc SOCKETS_LEAVE_SONG_ROOMS(payload) { // sockets return new Promise(resolve => { Object.keys(payload.sockets).forEach(socketKey => { const socket = payload.sockets[socketKey]; const { rooms } = socket; Object.keys(rooms).forEach(roomKey => { const room = rooms[roomKey]; if (room.indexOf("song.") !== -1) socket.leave(room); }); }); resolve(); }); } /** * Gets any sockets connected to a room * * @param {object} payload - object that contains the payload * @param {string} payload.room - the name of the room * @returns {Promise} - returns promise (reject, resolve) */ async GET_ROOM_SOCKETS(payload) { return new Promise(resolve => { if (IOModule.rooms[payload.room]) return resolve(IOModule.rooms[payload.room]); return resolve([]); }); } /** * Handles io.use * * @param {object} payload - object that contains the payload * @returns {Promise} - returns promise (reject, resolve) */ async HANDLE_IO_USE(payload) { console.log("io use"); return new Promise(resolve => { const { socket, req } = payload; let SID; socket.ip = req.headers["x-forwarded-for"] || "0.0.0.0"; return async.waterfall( [ next => { if (!req.headers.cookie) return next("No cookie exists yet."); return UtilsModule.runJob("PARSE_COOKIES", { cookieString: req.headers.cookie }, this).then( res => { SID = res[IOModule.SIDname]; next(null); } ); }, next => { if (!SID) return next("No SID."); return next(); }, next => { CacheModule.runJob("HGET", { table: "sessions", key: SID }, this) .then(session => next(null, session)) .catch(next); }, (session, next) => { if (!session) return next("No session found."); session.refreshDate = Date.now(); socket.session = session; return CacheModule.runJob( "HSET", { table: "sessions", key: SID, value: session }, this ).then(session => next(null, session)); }, (res, next) => { // check if a session's user / IP is banned PunishmentsModule.runJob("GET_PUNISHMENTS", {}, this) .then(punishments => { const isLoggedIn = !!(socket.session && socket.session.refreshDate); const userId = isLoggedIn ? socket.session.userId : null; const banishment = { banned: false, ban: 0 }; punishments.forEach(punishment => { if (punishment.expiresAt > banishment.ban) banishment.ban = punishment; if (punishment.type === "banUserId" && isLoggedIn && punishment.value === userId) banishment.banned = true; if (punishment.type === "banUserIp" && punishment.value === socket.ip) banishment.banned = true; }); socket.banishment = banishment; next(); }) .catch(() => { next(); }); } ], () => { if (!socket.session) socket.session = { socketId: req.headers["sec-websocket-key"] }; else socket.session.socketId = req.headers["sec-websocket-key"]; // cb(); resolve(socket); } ); }); } /** * Handles io.connection * * @param {object} payload - object that contains the payload * @returns {Promise} - returns promise (reject, resolve) */ async HANDLE_IO_CONNECTION(payload) { console.log("handle io connection"); return new Promise(resolve => { const { socket } = payload; let sessionInfo = ""; if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`; // if session is banned if (socket.banishment && socket.banishment.banned) { IOModule.log( "INFO", "IO_BANNED_CONNECTION", `A user tried to connect, but is currently banned. IP: ${socket.ip}.${sessionInfo}` ); socket.dispatch("keep.event:banned", socket.banishment.ban); return socket.disconnect(true); // need to fix } IOModule.log("INFO", "IO_CONNECTION", `User connected. IP: ${socket.ip}.${sessionInfo}`); // catch when the socket has been disconnected socket.on("close", async () => { if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`; IOModule.log("INFO", "IO_DISCONNECTION", `User disconnected. IP: ${socket.ip}.${sessionInfo}`); await IOModule.runJob("SOCKET_LEAVE_ROOMS", { socketId: socket.session.socketId }); }); // socket.use((data, next) => { // if (data.length === 0) return next(new Error("Not enough arguments specified.")); // if (typeof data[0] !== "string") return next(new Error("First argument must be a string.")); // const namespaceAction = data[0]; // if ( // !namespaceAction || // namespaceAction.indexOf(".") === -1 || // namespaceAction.indexOf(".") !== namespaceAction.lastIndexOf(".") // ) // return next(new Error("Invalid first argument")); // const namespace = data[0].split(".")[0]; // const action = data[0].split(".")[1]; // if (!namespace) return next(new Error("Invalid namespace.")); // if (!action) return next(new Error("Invalid action.")); // if (!IOModule.actions[namespace]) return next(new Error("Namespace not found.")); // if (!IOModule.actions[namespace][action]) return next(new Error("Action not found.")); // return next(); // }); // catch errors on the socket (internal to socket.io) socket.onerror = console.error; // need to update if (socket.session.sessionId) { CacheModule.runJob("HGET", { table: "sessions", key: socket.session.sessionId }) .then(session => { if (session && session.userId) { IOModule.userModel.findOne({ _id: session.userId }, (err, user) => { if (err || !user) return socket.dispatch("ready", false); let role = ""; let username = ""; let userId = ""; if (user) { role = user.role; username = user.username; userId = session.userId; } return socket.dispatch("ready", true, role, username, userId); }); } else socket.dispatch("ready", false); }) .catch(() => { socket.dispatch("ready", false); }); } else socket.dispatch("ready", false); // have the socket listen for each action Object.keys(IOModule.actions).forEach(namespace => { Object.keys(IOModule.actions[namespace]).forEach(action => { // the full name of the action const name = `${namespace}.${action}`; socket.onmessage = message => { const data = JSON.parse(message.data); if (data[data.length - 1].callbackRef) { const { callbackRef } = data[data.length - 1]; data.pop(); return socket.actions.emit(data.shift(0), [ ...data, res => socket.dispatch("callbackRef", callbackRef, res) ]); } return socket.actions.emit(data.shift(0), data); }; // listen for this action to be called socket.listen(name, async args => IOModule.runJob("RUN_ACTION", { socket, namespace, action, args }) ); }); }); return resolve(); }); } /** * Runs an action * * @param {object} payload - object that contains the payload * @returns {Promise} - returns promise (reject, resolve) */ async RUN_ACTION(payload) { return new Promise((resolve, reject) => { const { socket, namespace, action, args } = payload; // the full name of the action const name = `${namespace}.${action}`; let cb = args[args.length - 1]; if (typeof cb !== "function") cb = () => { IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`); }; else args.pop(); IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`); // load the session from the cache new Promise(resolve => { if (socket.session.sessionId) CacheModule.runJob("HGET", { table: "sessions", key: socket.session.sessionId }) .then(session => { // make sure the sockets sessionId isn't set if there is no session if (socket.session.sessionId && session === null) delete socket.session.sessionId; resolve(); }) .catch(() => { if (typeof cb === "function") cb({ status: "error", message: "An error occurred while obtaining your session" }); reject(new Error("An error occurred while obtaining the session")); }); else resolve(); }) .then(() => { // call the job that calls the action, passing it the session, and the arguments socket.io passed us IOModule.runJob("RUN_ACTION2", { session: socket.session, namespace, action, args }, this) .then(response => { cb(response); resolve(); }) .catch(err => { if (typeof cb === "function") cb({ status: "error", message: "An error occurred while executing the specified action." }); reject(err); IOModule.log( "ERROR", "IO_ACTION_ERROR", `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}` ); }); }) .catch(reject); }); } /** * Runs an action * * @param {object} payload - object that contains the payload * @returns {Promise} - returns promise (reject, resolve) */ async RUN_ACTION2(payload) { return new Promise((resolve, reject) => { const { session, namespace, action, args } = payload; try { // call the the action, passing it the session, and the arguments socket.io passed us IOModule.actions[namespace][action].apply( this, [session].concat(args).concat([ result => { IOModule.log( "INFO", "RUN_ACTION2", `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}` ); resolve(result); } ]) ); } catch (err) { reject(err); IOModule.log( "ERROR", "IO_ACTION_ERROR", `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}` ); } }); } } export default new _IOModule();