io.js 21 KB


  1. /**
  2. * @file
  3. */
  4. import config from "config";
  5. import async from "async";
  6. import socketio from "socket.io";
  7. import CoreClass from "../core";
  8. let IOModule;
  9. let AppModule;
  10. let CacheModule;
  11. let UtilsModule;
  12. let DBModule;
  13. let PunishmentsModule;
  14. class _IOModule extends CoreClass {
  15. // eslint-disable-next-line require-jsdoc
  16. constructor() {
  17. super("io");
  18. IOModule = this;
  19. }
  20. /**
  21. * Initialises the io module
  22. *
  23. * @returns {Promise} - returns promise (reject, resolve)
  24. */
  25. async initialize() {
  26. this.setStage(1);
  27. AppModule = this.moduleManager.modules.app;
  28. CacheModule = this.moduleManager.modules.cache;
  29. UtilsModule = this.moduleManager.modules.utils;
  30. DBModule = this.moduleManager.modules.db;
  31. PunishmentsModule = this.moduleManager.modules.punishments;
  32. this.actions = (await import("./actions")).default;
  33. this.userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  34. this.setStage(2);
  35. this.SIDname = config.get("cookie.SIDname");
  36. // TODO: Check every 30s/, for all sockets, if they are still allowed to be in the rooms they are in, and on socket at all (permission changing/banning)
  37. const server = await AppModule.runJob("SERVER");
  38. this._io = socketio(server);
  39. // this._io.origins(config.get("cors.origin"));
  40. return new Promise(resolve => {
  41. this.setStage(3);
  42. this._io.use(async (socket, cb) => {
  43. IOModule.runJob("HANDLE_IO_USE", { socket, cb });
  44. });
  45. this.setStage(4);
  46. this._io.on("connection", async socket => {
  47. IOModule.runJob("HANDLE_IO_CONNECTION", { socket });
  48. });
  49. this.setStage(5);
  50. return resolve();
  51. });
  52. }
  53. /**
  54. * Returns the socket io variable
  55. *
  56. * @returns {Promise} - returns a promise (resolve, reject)
  57. */
  58. IO() {
  59. return new Promise(resolve => {
  60. resolve(IOModule._io);
  61. });
  62. }
  63. /**
  64. * Returns whether there is a socket for a session id or not
  65. *
  66. * @param {object} payload - object containing the payload
  67. * @param {string} payload.sessionId - user session id
  68. * @returns {Promise} - returns promise (reject, resolve)
  69. */
  70. async SOCKET_FROM_SESSION(payload) {
  71. // socketId
  72. return new Promise((resolve, reject) => {
  73. const ns = IOModule._io.of("/");
  74. if (ns) {
  75. return resolve(ns.connected[payload.socketId]);
  76. }
  77. return reject();
  78. });
  79. }
  80. /**
  81. * Gets all sockets for a specified session id
  82. *
  83. * @param {object} payload - object containing the payload
  84. * @param {string} payload.sessionId - user session id
  85. * @returns {Promise} - returns promise (reject, resolve)
  86. */
  87. async SOCKETS_FROM_SESSION_ID(payload) {
  88. return new Promise(resolve => {
  89. const ns = IOModule._io.of("/");
  90. const sockets = [];
  91. if (ns) {
  92. return async.each(
  93. Object.keys(ns.connected),
  94. (id, next) => {
  95. const { session } = ns.connected[id];
  96. if (session.sessionId === payload.sessionId) sockets.push(session.sessionId);
  97. next();
  98. },
  99. () => {
  100. resolve({ sockets });
  101. }
  102. );
  103. }
  104. return resolve();
  105. });
  106. }
  107. /**
  108. * Returns any sockets for a specific user
  109. *
  110. * @param {object} payload - object that contains the payload
  111. * @param {string} payload.userId - the user id
  112. * @returns {Promise} - returns promise (reject, resolve)
  113. */
  114. async SOCKETS_FROM_USER(payload) {
  115. return new Promise((resolve, reject) => {
  116. const ns = IOModule._io.of("/");
  117. const sockets = [];
  118. if (ns) {
  119. return async.eachLimit(
  120. Object.keys(ns.connected),
  121. 1,
  122. (id, next) => {
  123. const { session } = ns.connected[id];
  124. if (session.sessionId) {
  125. CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }, this)
  126. .then(session => {
  127. if (session && session.userId === payload.userId) sockets.push(ns.connected[id]);
  128. next();
  129. })
  130. .catch(err => next(err));
  131. } else next();
  132. },
  133. err => {
  134. if (err) return reject(err);
  135. return resolve({ sockets });
  136. }
  137. );
  138. }
  139. return resolve();
  140. });
  141. }
  142. /**
  143. * Returns any sockets from a specific ip address
  144. *
  145. * @param {object} payload - object that contains the payload
  146. * @param {string} payload.ip - the ip address in question
  147. * @returns {Promise} - returns promise (reject, resolve)
  148. */
  149. async SOCKETS_FROM_IP(payload) {
  150. return new Promise(resolve => {
  151. const ns = IOModule._io.of("/");
  152. const sockets = [];
  153. if (ns) {
  154. return async.each(
  155. Object.keys(ns.connected),
  156. (id, next) => {
  157. const { session } = ns.connected[id];
  158. CacheModule.runJob(
  159. "HGET",
  160. {
  161. table: "sessions",
  162. key: session.sessionId
  163. },
  164. this
  165. )
  166. .then(session => {
  167. if (session && ns.connected[id].ip === payload.ip) sockets.push(ns.connected[id]);
  168. next();
  169. })
  170. .catch(() => next());
  171. },
  172. () => {
  173. resolve(sockets);
  174. }
  175. );
  176. }
  177. return resolve();
  178. });
  179. }
  180. /**
  181. * Returns any sockets from a specific user without using redis/cache
  182. *
  183. * @param {object} payload - object that contains the payload
  184. * @param {string} payload.userId - the id of the user in question
  185. * @returns {Promise} - returns promise (reject, resolve)
  186. */
  187. async SOCKETS_FROM_USER_WITHOUT_CACHE(payload) {
  188. return new Promise(resolve => {
  189. const ns = IOModule._io.of("/");
  190. const sockets = [];
  191. if (ns) {
  192. return async.each(
  193. Object.keys(ns.connected),
  194. (id, next) => {
  195. const { session } = ns.connected[id];
  196. if (session.userId === payload.userId) sockets.push(ns.connected[id]);
  197. next();
  198. },
  199. () => {
  200. resolve({ sockets });
  201. }
  202. );
  203. }
  204. return resolve();
  205. });
  206. }
  207. /**
  208. * Allows a socket to leave any rooms they are connected to
  209. *
  210. * @param {object} payload - object that contains the payload
  211. * @param {string} payload.socketId - the id of the socket which should leave all their rooms
  212. * @returns {Promise} - returns promise (reject, resolve)
  213. */
  214. async SOCKET_LEAVE_ROOMS(payload) {
  215. const socket = await IOModule.runJob(
  216. "SOCKET_FROM_SESSION",
  217. {
  218. socketId: payload.socketId
  219. },
  220. this
  221. );
  222. return new Promise(resolve => {
  223. const { rooms } = socket;
  224. Object.keys(rooms).forEach(roomKey => {
  225. const room = rooms[roomKey];
  226. socket.leave(room);
  227. });
  228. return resolve();
  229. });
  230. }
  231. /**
  232. * Allows a socket to join a specified room
  233. *
  234. * @param {object} payload - object that contains the payload
  235. * @param {string} payload.socketId - the id of the socket which should join the room
  236. * @param {object} payload.room - the object representing the room the socket should join
  237. * @returns {Promise} - returns promise (reject, resolve)
  238. */
  239. async SOCKET_JOIN_ROOM(payload) {
  240. const socket = await IOModule.runJob(
  241. "SOCKET_FROM_SESSION",
  242. {
  243. socketId: payload.socketId
  244. },
  245. this
  246. );
  247. return new Promise(resolve => {
  248. const { rooms } = socket;
  249. Object.keys(rooms).forEach(roomKey => {
  250. const room = rooms[roomKey];
  251. socket.leave(room);
  252. });
  253. socket.join(payload.room);
  254. return resolve();
  255. });
  256. }
  257. // UNKNOWN
  258. // eslint-disable-next-line require-jsdoc
  259. async SOCKET_JOIN_SONG_ROOM(payload) {
  260. // socketId, room
  261. const socket = await IOModule.runJob(
  262. "SOCKET_FROM_SESSION",
  263. {
  264. socketId: payload.socketId
  265. },
  266. this
  267. );
  268. return new Promise(resolve => {
  269. const { rooms } = socket;
  270. Object.keys(rooms).forEach(roomKey => {
  271. const room = rooms[roomKey];
  272. if (room.indexOf("song.") !== -1) socket.leave(room);
  273. });
  274. socket.join(payload.room);
  275. return resolve();
  276. });
  277. }
  278. // UNKNOWN
  279. // eslint-disable-next-line require-jsdoc
  280. SOCKETS_JOIN_SONG_ROOM(payload) {
  281. // sockets, room
  282. return new Promise(resolve => {
  283. Object.keys(payload.sockets).forEach(socketKey => {
  284. const socket = payload.sockets[socketKey];
  285. const { rooms } = socket;
  286. Object.keys(rooms).forEach(roomKey => {
  287. const room = rooms[roomKey];
  288. if (room.indexOf("song.") !== -1) socket.leave(room);
  289. });
  290. socket.join(payload.room);
  291. });
  292. return resolve();
  293. });
  294. }
  295. // UNKNOWN
  296. // eslint-disable-next-line require-jsdoc
  297. SOCKETS_LEAVE_SONG_ROOMS(payload) {
  298. // sockets
  299. return new Promise(resolve => {
  300. Object.keys(payload.sockets).forEach(socketKey => {
  301. const socket = payload.sockets[socketKey];
  302. const { rooms } = socket;
  303. Object.keys(rooms).forEach(roomKey => {
  304. const room = rooms[roomKey];
  305. if (room.indexOf("song.") !== -1) socket.leave(room);
  306. });
  307. });
  308. resolve();
  309. });
  310. }
  311. /**
  312. * Emits arguments to any sockets that are in a specified a room
  313. *
  314. * @param {object} payload - object that contains the payload
  315. * @param {string} payload.room - the name of the room to emit arguments
  316. * @param {object} payload.args - any arguments to be emitted to the sockets in the specific room
  317. * @returns {Promise} - returns promise (reject, resolve)
  318. */
  319. async EMIT_TO_ROOM(payload) {
  320. return new Promise(resolve => {
  321. const { sockets } = IOModule._io.sockets;
  322. Object.keys(sockets).forEach(socketKey => {
  323. const socket = sockets[socketKey];
  324. if (socket.rooms[payload.room]) {
  325. socket.emit(...payload.args);
  326. }
  327. });
  328. return resolve();
  329. });
  330. }
  331. /**
  332. * Gets any sockets connected to a room
  333. *
  334. * @param {object} payload - object that contains the payload
  335. * @param {string} payload.room - the name of the room
  336. * @returns {Promise} - returns promise (reject, resolve)
  337. */
  338. async GET_ROOM_SOCKETS(payload) {
  339. return new Promise(resolve => {
  340. const { sockets } = IOModule._io.sockets;
  341. const roomSockets = [];
  342. Object.keys(sockets).forEach(socketKey => {
  343. const socket = sockets[socketKey];
  344. if (socket.rooms[payload.room]) roomSockets.push(socket);
  345. });
  346. return resolve(roomSockets);
  347. });
  348. }
  349. /**
  350. * Handles io.use
  351. *
  352. * @param {object} payload - object that contains the payload
  353. * @returns {Promise} - returns promise (reject, resolve)
  354. */
  355. async HANDLE_IO_USE(payload) {
  356. return new Promise(resolve => {
  357. const { socket, cb } = payload;
  358. let SID;
  359. socket.ip = socket.request.headers["x-forwarded-for"] || "0.0.0.0";
  360. return async.waterfall(
  361. [
  362. next => {
  363. if (!socket.request.headers.cookie) return next("No cookie exists yet.");
  364. return UtilsModule.runJob(
  365. "PARSE_COOKIES",
  366. {
  367. cookieString: socket.request.headers.cookie
  368. },
  369. this
  370. ).then(res => {
  371. SID = res[IOModule.SIDname];
  372. next(null);
  373. });
  374. },
  375. next => {
  376. if (!SID) return next("No SID.");
  377. return next();
  378. },
  379. next => {
  380. CacheModule.runJob("HGET", { table: "sessions", key: SID }, this)
  381. .then(session => {
  382. next(null, session);
  383. })
  384. .catch(next);
  385. },
  386. (session, next) => {
  387. if (!session) return next("No session found.");
  388. session.refreshDate = Date.now();
  389. socket.session = session;
  390. return CacheModule.runJob(
  391. "HSET",
  392. {
  393. table: "sessions",
  394. key: SID,
  395. value: session
  396. },
  397. this
  398. ).then(session => {
  399. next(null, session);
  400. });
  401. },
  402. (res, next) => {
  403. // check if a session's user / IP is banned
  404. PunishmentsModule.runJob("GET_PUNISHMENTS", {}, this)
  405. .then(punishments => {
  406. const isLoggedIn = !!(socket.session && socket.session.refreshDate);
  407. const userId = isLoggedIn ? socket.session.userId : null;
  408. const banishment = {
  409. banned: false,
  410. ban: 0
  411. };
  412. punishments.forEach(punishment => {
  413. if (punishment.expiresAt > banishment.ban) banishment.ban = punishment;
  414. if (punishment.type === "banUserId" && isLoggedIn && punishment.value === userId)
  415. banishment.banned = true;
  416. if (punishment.type === "banUserIp" && punishment.value === socket.ip)
  417. banishment.banned = true;
  418. });
  419. socket.banishment = banishment;
  420. next();
  421. })
  422. .catch(() => {
  423. next();
  424. });
  425. }
  426. ],
  427. () => {
  428. if (!socket.session) socket.session = { socketId: socket.id };
  429. else socket.session.socketId = socket.id;
  430. cb();
  431. resolve();
  432. }
  433. );
  434. });
  435. }
  436. /**
  437. * Handles io.connection
  438. *
  439. * @param {object} payload - object that contains the payload
  440. * @returns {Promise} - returns promise (reject, resolve)
  441. */
  442. async HANDLE_IO_CONNECTION(payload) {
  443. return new Promise(resolve => {
  444. const { socket } = payload;
  445. let sessionInfo = "";
  446. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  447. // if session is banned
  448. if (socket.banishment && socket.banishment.banned) {
  449. IOModule.log(
  450. "INFO",
  451. "IO_BANNED_CONNECTION",
  452. `A user tried to connect, but is currently banned. IP: ${socket.ip}.${sessionInfo}`
  453. );
  454. socket.emit("keep.event:banned", socket.banishment.ban);
  455. return socket.disconnect(true);
  456. }
  457. IOModule.log("INFO", "IO_CONNECTION", `User connected. IP: ${socket.ip}.${sessionInfo}`);
  458. // catch when the socket has been disconnected
  459. socket.on("disconnect", () => {
  460. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  461. IOModule.log("INFO", "IO_DISCONNECTION", `User disconnected. IP: ${socket.ip}.${sessionInfo}`);
  462. });
  463. socket.use((data, next) => {
  464. if (data.length === 0) return next(new Error("Not enough arguments specified."));
  465. if (typeof data[0] !== "string") return next(new Error("First argument must be a string."));
  466. const namespaceAction = data[0];
  467. if (
  468. !namespaceAction ||
  469. namespaceAction.indexOf(".") === -1 ||
  470. namespaceAction.indexOf(".") !== namespaceAction.lastIndexOf(".")
  471. )
  472. return next(new Error("Invalid first argument"));
  473. const namespace = data[0].split(".")[0];
  474. const action = data[0].split(".")[1];
  475. if (!namespace) return next(new Error("Invalid namespace."));
  476. if (!action) return next(new Error("Invalid action."));
  477. if (!IOModule.actions[namespace]) return next(new Error("Namespace not found."));
  478. if (!IOModule.actions[namespace][action]) return next(new Error("Action not found."));
  479. return next();
  480. });
  481. // catch errors on the socket (internal to socket.io)
  482. socket.on("error", console.error);
  483. if (socket.session.sessionId) {
  484. CacheModule.runJob("HGET", {
  485. table: "sessions",
  486. key: socket.session.sessionId
  487. })
  488. .then(session => {
  489. if (session && session.userId) {
  490. IOModule.userModel.findOne({ _id: session.userId }, (err, user) => {
  491. if (err || !user) return socket.emit("ready", false);
  492. let role = "";
  493. let username = "";
  494. let userId = "";
  495. if (user) {
  496. role = user.role;
  497. username = user.username;
  498. userId = session.userId;
  499. }
  500. return socket.emit("ready", true, role, username, userId);
  501. });
  502. } else socket.emit("ready", false);
  503. })
  504. .catch(() => {
  505. socket.emit("ready", false);
  506. });
  507. } else socket.emit("ready", false);
  508. // have the socket listen for each action
  509. Object.keys(IOModule.actions).forEach(namespace => {
  510. Object.keys(IOModule.actions[namespace]).forEach(action => {
  511. // the full name of the action
  512. const name = `${namespace}.${action}`;
  513. // listen for this action to be called
  514. socket.on(name, async (...args) => {
  515. IOModule.runJob("RUN_ACTION", { socket, namespace, action, args });
  516. /* let cb = args[args.length - 1];
  517. if (typeof cb !== "function")
  518. cb = () => {
  519. IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  520. };
  521. else args.pop();
  522. if (this.getStatus() !== "READY") {
  523. IOModule.log(
  524. "INFO",
  525. "IO_REJECTED_ACTION",
  526. `A user tried to execute an action, but the IO module is currently not ready. Action: ${namespace}.${action}.`
  527. );
  528. return;
  529. }
  530. IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  531. let failedGettingSession = false;
  532. // load the session from the cache
  533. if (socket.session.sessionId)
  534. await CacheModule.runJob("HGET", {
  535. table: "sessions",
  536. key: socket.session.sessionId
  537. })
  538. .then(session => {
  539. // make sure the sockets sessionId isn't set if there is no session
  540. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  541. })
  542. .catch(() => {
  543. failedGettingSession = true;
  544. if (typeof cb === "function")
  545. cb({
  546. status: "error",
  547. message: "An error occurred while obtaining your session"
  548. });
  549. });
  550. if (!failedGettingSession)
  551. try {
  552. // call the action, passing it the session, and the arguments socket.io passed us
  553. this.runJob("RUN_ACTION", { namespace, action, session: socket.session, args })
  554. .then(response => {
  555. if (typeof cb === "function") cb(response);
  556. })
  557. .catch(err => {
  558. if (typeof cb === "function") cb(err);
  559. });
  560. // actions[namespace][action].apply(
  561. // null,
  562. // [socket.session].concat(args).concat([
  563. // result => {
  564. // IOModule.log(
  565. // "INFO",
  566. // "IO_ACTION",
  567. // `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  568. // );
  569. // // respond to the socket with our message
  570. // if (typeof cb === "function") cb(result);
  571. // }
  572. // ])
  573. // );
  574. } catch (err) {
  575. if (typeof cb === "function")
  576. cb({
  577. status: "error",
  578. message: "An error occurred while executing the specified action."
  579. });
  580. IOModule.log(
  581. "ERROR",
  582. "IO_ACTION_ERROR",
  583. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  584. );
  585. } */
  586. });
  587. });
  588. });
  589. return resolve();
  590. });
  591. }
  592. /**
  593. * Runs an action
  594. *
  595. * @param {object} payload - object that contains the payload
  596. * @returns {Promise} - returns promise (reject, resolve)
  597. */
  598. async RUN_ACTION(payload) {
  599. return new Promise((resolve, reject) => {
  600. const { socket, namespace, action, args } = payload;
  601. // the full name of the action
  602. const name = `${namespace}.${action}`;
  603. let cb = args[args.length - 1];
  604. if (typeof cb !== "function")
  605. cb = () => {
  606. IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  607. };
  608. else args.pop();
  609. IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  610. // load the session from the cache
  611. new Promise(resolve => {
  612. if (socket.session.sessionId)
  613. CacheModule.runJob("HGET", {
  614. table: "sessions",
  615. key: socket.session.sessionId
  616. })
  617. .then(session => {
  618. // make sure the sockets sessionId isn't set if there is no session
  619. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  620. resolve();
  621. })
  622. .catch(() => {
  623. if (typeof cb === "function")
  624. cb({
  625. status: "error",
  626. message: "An error occurred while obtaining your session"
  627. });
  628. reject(new Error("An error occurred while obtaining the session"));
  629. });
  630. else resolve();
  631. })
  632. .then(() => {
  633. // call the job that calls the action, passing it the session, and the arguments socket.io passed us
  634. IOModule.runJob("RUN_ACTION2", { session: socket.session, namespace, action, args }, this)
  635. .then(response => {
  636. cb(response);
  637. resolve();
  638. })
  639. .catch(err => {
  640. if (typeof cb === "function")
  641. cb({
  642. status: "error",
  643. message: "An error occurred while executing the specified action."
  644. });
  645. reject(err);
  646. IOModule.log(
  647. "ERROR",
  648. "IO_ACTION_ERROR",
  649. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  650. );
  651. });
  652. })
  653. .catch(reject);
  654. });
  655. }
  656. /**
  657. * Runs an action
  658. *
  659. * @param {object} payload - object that contains the payload
  660. * @returns {Promise} - returns promise (reject, resolve)
  661. */
  662. async RUN_ACTION2(payload) {
  663. return new Promise((resolve, reject) => {
  664. const { session, namespace, action, args } = payload;
  665. try {
  666. // call the the action, passing it the session, and the arguments socket.io passed us
  667. IOModule.actions[namespace][action].apply(
  668. this,
  669. [session].concat(args).concat([
  670. result => {
  671. IOModule.log(
  672. "INFO",
  673. "RUN_ACTION2",
  674. `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  675. );
  676. resolve(result);
  677. }
  678. ])
  679. );
  680. } catch (err) {
  681. reject(err);
  682. IOModule.log(
  683. "ERROR",
  684. "IO_ACTION_ERROR",
  685. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  686. );
  687. }
  688. });
  689. }
  690. }
  691. export default new _IOModule();