ws.js 20 KB


  1. /**
  2. * @file
  3. */
  4. import config from "config";
  5. import async from "async";
  6. import WebSocket from "ws";
  7. import { EventEmitter } from "events";
  8. import CoreClass from "../core";
  9. let WSModule;
  10. let AppModule;
  11. let CacheModule;
  12. let UtilsModule;
  13. let DBModule;
  14. let PunishmentsModule;
  15. class _WSModule extends CoreClass {
  16. // eslint-disable-next-line require-jsdoc
  17. constructor() {
  18. super("ws");
  19. WSModule = this;
  20. }
  21. /**
  22. * Initialises the ws module
  23. *
  24. * @returns {Promise} - returns promise (reject, resolve)
  25. */
  26. async initialize() {
  27. this.setStage(1);
  28. AppModule = this.moduleManager.modules.app;
  29. CacheModule = this.moduleManager.modules.cache;
  30. UtilsModule = this.moduleManager.modules.utils;
  31. DBModule = this.moduleManager.modules.db;
  32. PunishmentsModule = this.moduleManager.modules.punishments;
  33. this.actions = (await import("./actions")).default;
  34. this.userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  35. this.setStage(2);
  36. this.SIDname = config.get("cookie.SIDname");
  37. // TODO: Check every 30s/, for all sockets, if they are still allowed to be in the rooms they are in, and on socket at all (permission changing/banning)
  38. const server = await AppModule.runJob("SERVER");
  39. // this._io.origins(config.get("cors.origin"));
  40. this._io = new WebSocket.Server({ server, path: "/ws" });
  41. this.rooms = {};
  42. return new Promise(resolve => {
  43. this.setStage(3);
  44. this._io.on("connection", async (socket, req) => {
  45. socket.dispatch = (...args) => socket.send(JSON.stringify(args));
  46. socket.actions = new EventEmitter();
  47. socket.actions.setMaxListeners(0);
  48. socket.listen = (target, cb) => socket.actions.addListener(target, args => cb(args));
  49. WSModule.runJob("HANDLE_WS_USE", { socket, req }).then(socket =>
  50. WSModule.runJob("HANDLE_WS_CONNECTION", { socket })
  51. );
  52. });
  53. this.setStage(4);
  54. return resolve();
  55. });
  56. }
  57. /**
  58. * Returns the websockets variable
  59. *
  60. * @returns {Promise} - returns a promise (resolve, reject)
  61. */
  62. WS() {
  63. return new Promise(resolve => resolve(WSModule._io));
  64. }
  65. /**
  66. * Obtains socket object for a specified socket id
  67. *
  68. * @param {object} payload - object containing the payload
  69. * @param {string} payload.socketId - the id of the socket
  70. * @returns {Promise} - returns promise (reject, resolve)
  71. */
  72. async SOCKET_FROM_SOCKET_ID(payload) {
  73. return new Promise((resolve, reject) => {
  74. const { clients } = WSModule._io;
  75. if (clients)
  76. return clients.forEach(socket => {
  77. if (socket.session.socketId === payload.socketId) resolve(socket);
  78. });
  79. return reject();
  80. });
  81. }
  82. /**
  83. * Gets all sockets for a specified session id
  84. *
  85. * @param {object} payload - object containing the payload
  86. * @param {string} payload.sessionId - user session id
  87. * @returns {Promise} - returns promise (reject, resolve)
  88. */
  89. async SOCKETS_FROM_SESSION_ID(payload) {
  90. return new Promise(resolve => {
  91. const { clients } = WSModule._io;
  92. const sockets = [];
  93. if (clients) {
  94. return async.each(
  95. Object.keys(clients),
  96. (id, next) => {
  97. const { session } = clients[id];
  98. if (session.sessionId === payload.sessionId) sockets.push(session.sessionId);
  99. next();
  100. },
  101. () => resolve(sockets)
  102. );
  103. }
  104. return resolve();
  105. });
  106. }
  107. /**
  108. * Returns any sockets for a specific user
  109. *
  110. * @param {object} payload - object that contains the payload
  111. * @param {string} payload.userId - the user id
  112. * @returns {Promise} - returns promise (reject, resolve)
  113. */
  114. async SOCKETS_FROM_USER(payload) {
  115. return new Promise((resolve, reject) => {
  116. const sockets = [];
  117. return async.eachLimit(
  118. WSModule._io.clients,
  119. 1,
  120. (socket, next) => {
  121. const { sessionId } = socket.session;
  122. if (sessionId) {
  123. return CacheModule.runJob("HGET", { table: "sessions", key: sessionId }, this)
  124. .then(session => {
  125. if (session && session.userId === payload.userId) sockets.push(socket);
  126. next();
  127. })
  128. .catch(err => next(err));
  129. }
  130. return next();
  131. },
  132. err => {
  133. if (err) return reject(err);
  134. return resolve(sockets);
  135. }
  136. );
  137. });
  138. }
  139. /**
  140. * Returns any sockets from a specific ip address
  141. *
  142. * @param {object} payload - object that contains the payload
  143. * @param {string} payload.ip - the ip address in question
  144. * @returns {Promise} - returns promise (reject, resolve)
  145. */
  146. async SOCKETS_FROM_IP(payload) {
  147. return new Promise(resolve => {
  148. const { clients } = WSModule._io;
  149. const sockets = [];
  150. return async.each(
  151. Object.keys(clients),
  152. (id, next) => {
  153. const { session } = clients[id];
  154. CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }, this)
  155. .then(session => {
  156. if (session && clients[id].ip === payload.ip) sockets.push(clients[id]);
  157. next();
  158. })
  159. .catch(() => next());
  160. },
  161. () => resolve(sockets)
  162. );
  163. });
  164. }
  165. /**
  166. * Returns any sockets from a specific user without using redis/cache
  167. *
  168. * @param {object} payload - object that contains the payload
  169. * @param {string} payload.userId - the id of the user in question
  170. * @returns {Promise} - returns promise (reject, resolve)
  171. */
  172. async SOCKETS_FROM_USER_WITHOUT_CACHE(payload) {
  173. return new Promise(resolve => {
  174. const { clients } = WSModule._io;
  175. const sockets = [];
  176. if (clients) {
  177. return async.each(
  178. Object.keys(clients),
  179. (id, next) => {
  180. const { session } = clients[id];
  181. if (session.userId === payload.userId) sockets.push(clients[id]);
  182. next();
  183. },
  184. () => resolve(sockets)
  185. );
  186. }
  187. return resolve();
  188. });
  189. }
  190. /**
  191. * Allows a socket to leave any rooms they are connected to
  192. *
  193. * @param {object} payload - object that contains the payload
  194. * @param {string} payload.socketId - the id of the socket which should leave all their rooms
  195. * @returns {Promise} - returns promise (reject, resolve)
  196. */
  197. async SOCKET_LEAVE_ROOMS(payload) {
  198. return new Promise(resolve => {
  199. // filter out rooms that the user is in
  200. Object.keys(WSModule.rooms).forEach(room => {
  201. WSModule.rooms[room] = WSModule.rooms[room].filter(participant => participant !== payload.socketId);
  202. });
  203. return resolve();
  204. });
  205. }
  206. /**
  207. * Allows a socket to join a specified room (this will remove them from any rooms they are currently in)
  208. *
  209. * @param {object} payload - object that contains the payload
  210. * @param {string} payload.socketId - the id of the socket which should join the room
  211. * @param {string} payload.room - the name of the room
  212. * @returns {Promise} - returns promise (reject, resolve)
  213. */
  214. async SOCKET_JOIN_ROOM(payload) {
  215. const { room, socketId } = payload;
  216. // leave all other rooms
  217. await WSModule.runJob("SOCKET_LEAVE_ROOMS", { socketId }, this);
  218. return new Promise(resolve => {
  219. // create room if it doesn't exist, and add socketId to array
  220. if (WSModule.rooms[room]) WSModule.rooms[room].push(socketId);
  221. else WSModule.rooms[room] = [socketId];
  222. return resolve();
  223. });
  224. }
  225. /**
  226. * Emits arguments to any sockets that are in a specified a room
  227. *
  228. * @param {object} payload - object that contains the payload
  229. * @param {string} payload.room - the name of the room to emit arguments
  230. * @param {object} payload.args - any arguments to be emitted to the sockets in the specific room
  231. * @returns {Promise} - returns promise (reject, resolve)
  232. */
  233. async EMIT_TO_ROOM(payload) {
  234. return new Promise(resolve => {
  235. // if the room exists
  236. if (WSModule.rooms[payload.room])
  237. return WSModule.rooms[payload.room].forEach(async socketId => {
  238. // get every socketId (and thus every socket) in the room, and dispatch to each
  239. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  240. socket.dispatch(...payload.args);
  241. });
  242. return resolve();
  243. });
  244. }
  245. /**
  246. * Allows a socket to join a 'song' room
  247. *
  248. * @param {object} payload - object that contains the payload
  249. * @param {string} payload.socketId - the id of the socket which should join the room
  250. * @param {string} payload.room - the name of the room
  251. * @returns {Promise} - returns promise (reject, resolve)
  252. */
  253. async SOCKET_JOIN_SONG_ROOM(payload) {
  254. const { room, socketId } = payload;
  255. // leave any other song rooms the user is in
  256. await WSModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", { sockets: [socketId] }, this);
  257. return new Promise(resolve => {
  258. // join the room
  259. if (WSModule.rooms[room]) WSModule.rooms[room].push(socketId);
  260. else WSModule.rooms[room] = [socketId];
  261. return resolve();
  262. });
  263. }
  264. /**
  265. * Allows multiple sockets to join a 'song' room
  266. *
  267. * @param {object} payload - object that contains the payload
  268. * @param {Array} payload.sockets - array of socketIds
  269. * @param {object} payload.room - the name of the room
  270. * @returns {Promise} - returns promise (reject, resolve)
  271. */
  272. SOCKETS_JOIN_SONG_ROOM(payload) {
  273. return new Promise(resolve => {
  274. payload.sockets.forEach(socketId => WSModule.runJob("SOCKET_JOIN_SONG_ROOM", { socketId }, this));
  275. return resolve();
  276. });
  277. }
  278. /**
  279. * Allows multiple sockets to leave any 'song' rooms they are in
  280. *
  281. * @param {object} payload - object that contains the payload
  282. * @param {Array} payload.sockets - array of socketIds
  283. * @returns {Promise} - returns promise (reject, resolve)
  284. */
  285. SOCKETS_LEAVE_SONG_ROOMS(payload) {
  286. return new Promise(resolve => {
  287. payload.sockets.forEach(async socketId => {
  288. const rooms = await WSModule.runJob("GET_ROOMS_FOR_SOCKET", { socketId }, this);
  289. rooms.forEach(room => {
  290. if (room.indexOf("song.") !== -1)
  291. WSModule.rooms[room] = WSModule.rooms[room].filter(
  292. participant => participant !== payload.socketId
  293. );
  294. });
  295. });
  296. resolve();
  297. });
  298. }
  299. /**
  300. * Gets any sockets connected to a room
  301. *
  302. * @param {object} payload - object that contains the payload
  303. * @param {string} payload.room - the name of the room
  304. * @returns {Promise} - returns promise (reject, resolve)
  305. */
  306. async GET_SOCKETS_FOR_ROOM(payload) {
  307. return new Promise(resolve => {
  308. if (WSModule.rooms[payload.room]) return resolve(WSModule.rooms[payload.room]);
  309. return resolve([]);
  310. });
  311. }
  312. /**
  313. * Gets any rooms a socket is connected to
  314. *
  315. * @param {object} payload - object that contains the payload
  316. * @param {string} payload.socketId - the id of the socket to check the rooms for
  317. * @returns {Promise} - returns promise (reject, resolve)
  318. */
  319. async GET_ROOMS_FOR_SOCKET(payload) {
  320. return new Promise(resolve => {
  321. const rooms = [];
  322. Object.keys(WSModule.rooms).forEach(room => {
  323. if (WSModule.rooms[room].includes(payload.socketId)) rooms.push(room);
  324. });
  325. return resolve(rooms);
  326. });
  327. }
  328. /**
  329. * Handles use of websockets
  330. *
  331. * @param {object} payload - object that contains the payload
  332. * @returns {Promise} - returns promise (reject, resolve)
  333. */
  334. async HANDLE_WS_USE(payload) {
  335. return new Promise(resolve => {
  336. const { socket, req } = payload;
  337. let SID = "";
  338. socket.ip = req.headers["x-forwarded-for"] || "0..0.0";
  339. return async.waterfall(
  340. [
  341. next => {
  342. if (!req.headers.cookie) return next("No cookie exists yet.");
  343. return UtilsModule.runJob("PARSE_COOKIES", { cookieString: req.headers.cookie }, this).then(
  344. res => {
  345. SID = res[WSModule.SIDname];
  346. next(null);
  347. }
  348. );
  349. },
  350. next => {
  351. if (!SID) return next("No SID.");
  352. return next();
  353. },
  354. // see if session exists for cookie
  355. next => {
  356. CacheModule.runJob("HGET", { table: "sessions", key: SID }, this)
  357. .then(session => next(null, session))
  358. .catch(next);
  359. },
  360. (session, next) => {
  361. if (!session) return next("No session found.");
  362. session.refreshDate = Date.now();
  363. socket.session = session;
  364. return CacheModule.runJob(
  365. "HSET",
  366. { table: "sessions", key: SID, value: session },
  367. this
  368. ).then(session => next(null, session));
  369. },
  370. (res, next) => {
  371. // check if a session's user / IP is banned
  372. PunishmentsModule.runJob("GET_PUNISHMENTS", {}, this)
  373. .then(punishments => {
  374. const isLoggedIn = !!(socket.session && socket.session.refreshDate);
  375. const userId = isLoggedIn ? socket.session.userId : null;
  376. const banishment = {
  377. banned: false,
  378. ban: 0
  379. };
  380. punishments.forEach(punishment => {
  381. if (punishment.expiresAt > banishment.ban) banishment.ban = punishment;
  382. if (punishment.type === "banUserId" && isLoggedIn && punishment.value === userId)
  383. banishment.banned = true;
  384. if (punishment.type === "banUserIp" && punishment.value === socket.ip)
  385. banishment.banned = true;
  386. });
  387. socket.banishment = banishment;
  388. next();
  389. })
  390. .catch(() => next());
  391. }
  392. ],
  393. () => {
  394. if (!socket.session) socket.session = { socketId: req.headers["sec-websocket-key"] };
  395. else socket.session.socketId = req.headers["sec-websocket-key"];
  396. resolve(socket);
  397. }
  398. );
  399. });
  400. }
  401. /**
  402. * Handles a websocket connection
  403. *
  404. * @param {object} payload - object that contains the payload
  405. * @param {object} payload.socket - socket itself
  406. * @returns {Promise} - returns promise (reject, resolve)
  407. */
  408. async HANDLE_WS_CONNECTION(payload) {
  409. return new Promise(resolve => {
  410. const { socket } = payload;
  411. let sessionInfo = "";
  412. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  413. // if session is banned
  414. if (socket.banishment && socket.banishment.banned) {
  415. WSModule.log(
  416. "INFO",
  417. "IO_BANNED_CONNECTION",
  418. `A user tried to connect, but is currently banned. IP: ${socket.ip}.${sessionInfo}`
  419. );
  420. socket.dispatch("keep.event:banned", socket.banishment.ban);
  421. return socket.close(); // close socket connection
  422. }
  423. WSModule.log("INFO", "IO_CONNECTION", `User connected. IP: ${socket.ip}.${sessionInfo}`);
  424. // catch when the socket has been disconnected
  425. socket.on("close", async () => {
  426. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  427. WSModule.log("INFO", "IO_DISCONNECTION", `User disconnected. IP: ${socket.ip}.${sessionInfo}`);
  428. // leave all rooms when a socket connection is closed (to prevent rooms object building up)
  429. await WSModule.runJob("SOCKET_LEAVE_ROOMS", { socketId: socket.session.socketId });
  430. });
  431. // catch errors on the socket
  432. socket.onerror = error => {
  433. console.error("SOCKET ERROR: ", error);
  434. };
  435. if (socket.session.sessionId) {
  436. CacheModule.runJob("HGET", {
  437. table: "sessions",
  438. key: socket.session.sessionId
  439. })
  440. .then(session => {
  441. if (session && session.userId) {
  442. WSModule.userModel.findOne({ _id: session.userId }, (err, user) => {
  443. if (err || !user) return socket.dispatch("ready", false);
  444. let role = "";
  445. let username = "";
  446. let userId = "";
  447. if (user) {
  448. role = user.role;
  449. username = user.username;
  450. userId = session.userId;
  451. }
  452. return socket.dispatch("ready", true, role, username, userId);
  453. });
  454. } else socket.dispatch("ready", false);
  455. })
  456. .catch(() => socket.dispatch("ready", false));
  457. } else socket.dispatch("ready", false);
  458. socket.onmessage = message => {
  459. const data = JSON.parse(message.data);
  460. if (data.length === 0) return socket.dispatch("ERROR", "Not enough arguments specified.");
  461. if (typeof data[0] !== "string") return socket.dispatch("ERROR", "First argument must be a string.");
  462. const namespaceAction = data[0];
  463. if (
  464. !namespaceAction ||
  465. namespaceAction.indexOf(".") === -1 ||
  466. namespaceAction.indexOf(".") !== namespaceAction.lastIndexOf(".")
  467. )
  468. return socket.dispatch("ERROR", "Invalid first argument");
  469. const namespace = data[0].split(".")[0];
  470. const action = data[0].split(".")[1];
  471. if (!namespace) return socket.dispatch("ERROR", "Invalid namespace.");
  472. if (!action) return socket.dispatch("ERROR", "Invalid action.");
  473. if (!WSModule.actions[namespace]) return socket.dispatch("ERROR", "Namespace not found.");
  474. if (!WSModule.actions[namespace][action]) return socket.dispatch("ERROR", "Action not found.");
  475. if (data[data.length - 1].CB_REF) {
  476. const { CB_REF } = data[data.length - 1];
  477. data.pop();
  478. return socket.actions.emit(data.shift(0), [...data, res => socket.dispatch("CB_REF", CB_REF, res)]);
  479. }
  480. return socket.actions.emit(data.shift(0), data);
  481. };
  482. // have the socket listen for each action
  483. Object.keys(WSModule.actions).forEach(namespace => {
  484. Object.keys(WSModule.actions[namespace]).forEach(action => {
  485. // the full name of the action
  486. const name = `${namespace}.${action}`;
  487. // listen for this action to be called
  488. socket.listen(name, async args =>
  489. WSModule.runJob("RUN_ACTION", { socket, namespace, action, args })
  490. );
  491. });
  492. });
  493. return resolve();
  494. });
  495. }
  496. /**
  497. * Runs an action
  498. *
  499. * @param {object} payload - object that contains the payload
  500. * @returns {Promise} - returns promise (reject, resolve)
  501. */
  502. async RUN_ACTION(payload) {
  503. return new Promise((resolve, reject) => {
  504. const { socket, namespace, action, args } = payload;
  505. // the full name of the action
  506. const name = `${namespace}.${action}`;
  507. let cb = args[args.length - 1];
  508. if (typeof cb !== "function")
  509. cb = () => {
  510. WSModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  511. };
  512. else args.pop();
  513. WSModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  514. // load the session from the cache
  515. new Promise(resolve => {
  516. if (socket.session.sessionId)
  517. CacheModule.runJob("HGET", {
  518. table: "sessions",
  519. key: socket.session.sessionId
  520. })
  521. .then(session => {
  522. // make sure the sockets sessionId isn't set if there is no session
  523. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  524. resolve();
  525. })
  526. .catch(() => {
  527. if (typeof cb === "function")
  528. cb({
  529. status: "error",
  530. message: "An error occurred while obtaining your session"
  531. });
  532. reject(new Error("An error occurred while obtaining the session"));
  533. });
  534. else resolve();
  535. })
  536. .then(() => {
  537. // call the job that calls the action, passing it the session, and the arguments the websocket passed us
  538. WSModule.runJob("RUN_ACTION2", { session: socket.session, namespace, action, args }, this)
  539. .then(response => {
  540. cb(response);
  541. resolve();
  542. })
  543. .catch(err => {
  544. if (typeof cb === "function")
  545. cb({
  546. status: "error",
  547. message: "An error occurred while executing the specified action."
  548. });
  549. reject(err);
  550. WSModule.log(
  551. "ERROR",
  552. "IO_ACTION_ERROR",
  553. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  554. );
  555. });
  556. })
  557. .catch(reject);
  558. });
  559. }
  560. /**
  561. * Runs an action
  562. *
  563. * @param {object} payload - object that contains the payload
  564. * @returns {Promise} - returns promise (reject, resolve)
  565. */
  566. async RUN_ACTION2(payload) {
  567. return new Promise((resolve, reject) => {
  568. const { session, namespace, action, args } = payload;
  569. try {
  570. // call the the action, passing it the session, and the arguments the websocket passed us
  571. WSModule.actions[namespace][action].apply(
  572. this,
  573. [session].concat(args).concat([
  574. result => {
  575. WSModule.log(
  576. "INFO",
  577. "RUN_ACTION2",
  578. `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  579. );
  580. resolve(result);
  581. }
  582. ])
  583. );
  584. } catch (err) {
  585. reject(err);
  586. WSModule.log(
  587. "ERROR",
  588. "IO_ACTION_ERROR",
  589. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  590. );
  591. }
  592. });
  593. }
  594. }
  595. export default new _WSModule();