io.js 21 KB


  1. /**
  2. * @file
  3. */
  4. import config from "config";
  5. import async from "async";
  6. import socketio from "socket.io";
  7. import CoreClass from "../core";
  8. let IOModule;
  9. let AppModule;
  10. let CacheModule;
  11. let UtilsModule;
  12. let DBModule;
  13. let PunishmentsModule;
  14. class _IOModule extends CoreClass {
  15. // eslint-disable-next-line require-jsdoc
  16. constructor() {
  17. super("io");
  18. IOModule = this;
  19. }
  20. /**
  21. * Initialises the io module
  22. *
  23. * @returns {Promise} - returns promise (reject, resolve)
  24. */
  25. async initialize() {
  26. this.setStage(1);
  27. AppModule = this.moduleManager.modules.app;
  28. CacheModule = this.moduleManager.modules.cache;
  29. UtilsModule = this.moduleManager.modules.utils;
  30. DBModule = this.moduleManager.modules.db;
  31. PunishmentsModule = this.moduleManager.modules.punishments;
  32. this.actions = (await import("./actions")).default;
  33. this.userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  34. this.setStage(2);
  35. this.SIDname = config.get("cookie.SIDname");
  36. // TODO: Check every 30s/, for all sockets, if they are still allowed to be in the rooms they are in, and on socket at all (permission changing/banning)
  37. const server = await AppModule.runJob("SERVER");
  38. this._io = socketio(server);
  39. // this._io.origins(config.get("cors.origin"));
  40. return new Promise(resolve => {
  41. this.setStage(3);
  42. this._io.use(async (socket, cb) => {
  43. IOModule.runJob("HANDLE_IO_USE", { socket, cb });
  44. });
  45. this.setStage(4);
  46. this._io.on("connection", async socket => {
  47. IOModule.runJob("HANDLE_IO_CONNECTION", { socket });
  48. });
  49. this.setStage(5);
  50. return resolve();
  51. });
  52. }
  53. /**
  54. * Returns the socket io variable
  55. *
  56. * @returns {Promise} - returns a promise (resolve, reject)
  57. */
  58. IO() {
  59. return new Promise(resolve => {
  60. resolve(IOModule._io);
  61. });
  62. }
  63. /**
  64. * Returns whether there is a socket for a session id or not
  65. *
  66. * @param {object} payload - object containing the payload
  67. * @param {string} payload.sessionId - user session id
  68. * @returns {Promise} - returns promise (reject, resolve)
  69. */
  70. async SOCKET_FROM_SESSION(payload) {
  71. // socketId
  72. return new Promise((resolve, reject) => {
  73. const ns = IOModule._io.of("/");
  74. if (ns) {
  75. return resolve(ns.connected[payload.socketId]);
  76. }
  77. return reject();
  78. });
  79. }
  80. /**
  81. * Gets all sockets for a specified session id
  82. *
  83. * @param {object} payload - object containing the payload
  84. * @param {string} payload.sessionId - user session id
  85. * @returns {Promise} - returns promise (reject, resolve)
  86. */
  87. async SOCKETS_FROM_SESSION_ID(payload) {
  88. return new Promise(resolve => {
  89. const ns = IOModule._io.of("/");
  90. const sockets = [];
  91. if (ns) {
  92. return async.each(
  93. Object.keys(ns.connected),
  94. (id, next) => {
  95. const { session } = ns.connected[id];
  96. if (session.sessionId === payload.sessionId) sockets.push(session.sessionId);
  97. next();
  98. },
  99. () => {
  100. resolve({ sockets });
  101. }
  102. );
  103. }
  104. return resolve();
  105. });
  106. }
  107. /**
  108. * Returns any sockets for a specific user
  109. *
  110. * @param {object} payload - object that contains the payload
  111. * @param {string} payload.userId - the user id
  112. * @returns {Promise} - returns promise (reject, resolve)
  113. */
  114. async SOCKETS_FROM_USER(payload) {
  115. return new Promise((resolve, reject) => {
  116. const ns = IOModule._io.of("/");
  117. const sockets = [];
  118. if (ns) {
  119. return async.eachLimit(
  120. Object.keys(ns.connected),
  121. 1,
  122. (id, next) => {
  123. const { session } = ns.connected[id];
  124. CacheModule.runJob(
  125. "HGET",
  126. {
  127. table: "sessions",
  128. key: session.sessionId
  129. },
  130. this
  131. )
  132. .then(session => {
  133. if (session && session.userId === payload.userId) sockets.push(ns.connected[id]);
  134. next();
  135. })
  136. .catch(err => {
  137. next(err);
  138. });
  139. },
  140. err => {
  141. if (err) return reject(err);
  142. return resolve({ sockets });
  143. }
  144. );
  145. }
  146. return resolve();
  147. });
  148. }
  149. /**
  150. * Returns any sockets from a specific ip address
  151. *
  152. * @param {object} payload - object that contains the payload
  153. * @param {string} payload.ip - the ip address in question
  154. * @returns {Promise} - returns promise (reject, resolve)
  155. */
  156. async SOCKETS_FROM_IP(payload) {
  157. return new Promise(resolve => {
  158. const ns = IOModule._io.of("/");
  159. const sockets = [];
  160. if (ns) {
  161. return async.each(
  162. Object.keys(ns.connected),
  163. (id, next) => {
  164. const { session } = ns.connected[id];
  165. CacheModule.runJob(
  166. "HGET",
  167. {
  168. table: "sessions",
  169. key: session.sessionId
  170. },
  171. this
  172. )
  173. .then(session => {
  174. if (session && ns.connected[id].ip === payload.ip) sockets.push(ns.connected[id]);
  175. next();
  176. })
  177. .catch(() => next());
  178. },
  179. () => {
  180. resolve({ sockets });
  181. }
  182. );
  183. }
  184. return resolve();
  185. });
  186. }
  187. /**
  188. * Returns any sockets from a specific user without using redis/cache
  189. *
  190. * @param {object} payload - object that contains the payload
  191. * @param {string} payload.userId - the id of the user in question
  192. * @returns {Promise} - returns promise (reject, resolve)
  193. */
  194. async SOCKETS_FROM_USER_WITHOUT_CACHE(payload) {
  195. return new Promise(resolve => {
  196. const ns = IOModule._io.of("/");
  197. const sockets = [];
  198. if (ns) {
  199. return async.each(
  200. Object.keys(ns.connected),
  201. (id, next) => {
  202. const { session } = ns.connected[id];
  203. if (session.userId === payload.userId) sockets.push(ns.connected[id]);
  204. next();
  205. },
  206. () => {
  207. resolve({ sockets });
  208. }
  209. );
  210. }
  211. return resolve();
  212. });
  213. }
  214. /**
  215. * Allows a socket to leave any rooms they are connected to
  216. *
  217. * @param {object} payload - object that contains the payload
  218. * @param {string} payload.socketId - the id of the socket which should leave all their rooms
  219. * @returns {Promise} - returns promise (reject, resolve)
  220. */
  221. async SOCKET_LEAVE_ROOMS(payload) {
  222. const socket = await IOModule.runJob(
  223. "SOCKET_FROM_SESSION",
  224. {
  225. socketId: payload.socketId
  226. },
  227. this
  228. );
  229. return new Promise(resolve => {
  230. const { rooms } = socket;
  231. Object.keys(rooms).forEach(roomKey => {
  232. const room = rooms[roomKey];
  233. socket.leave(room);
  234. });
  235. return resolve();
  236. });
  237. }
  238. /**
  239. * Allows a socket to join a specified room
  240. *
  241. * @param {object} payload - object that contains the payload
  242. * @param {string} payload.socketId - the id of the socket which should join the room
  243. * @param {object} payload.room - the object representing the room the socket should join
  244. * @returns {Promise} - returns promise (reject, resolve)
  245. */
  246. async SOCKET_JOIN_ROOM(payload) {
  247. const socket = await IOModule.runJob(
  248. "SOCKET_FROM_SESSION",
  249. {
  250. socketId: payload.socketId
  251. },
  252. this
  253. );
  254. return new Promise(resolve => {
  255. const { rooms } = socket;
  256. Object.keys(rooms).forEach(roomKey => {
  257. const room = rooms[roomKey];
  258. socket.leave(room);
  259. });
  260. socket.join(payload.room);
  261. return resolve();
  262. });
  263. }
  264. // UNKNOWN
  265. // eslint-disable-next-line require-jsdoc
  266. async SOCKET_JOIN_SONG_ROOM(payload) {
  267. // socketId, room
  268. const socket = await IOModule.runJob(
  269. "SOCKET_FROM_SESSION",
  270. {
  271. socketId: payload.socketId
  272. },
  273. this
  274. );
  275. return new Promise(resolve => {
  276. const { rooms } = socket;
  277. Object.keys(rooms).forEach(roomKey => {
  278. const room = rooms[roomKey];
  279. if (room.indexOf("song.") !== -1) socket.leave(room);
  280. });
  281. socket.join(payload.room);
  282. return resolve();
  283. });
  284. }
  285. // UNKNOWN
  286. // eslint-disable-next-line require-jsdoc
  287. SOCKETS_JOIN_SONG_ROOM(payload) {
  288. // sockets, room
  289. return new Promise(resolve => {
  290. Object.keys(payload.sockets).forEach(socketKey => {
  291. const socket = payload.sockets[socketKey];
  292. const { rooms } = socket;
  293. Object.keys(rooms).forEach(roomKey => {
  294. const room = rooms[roomKey];
  295. if (room.indexOf("song.") !== -1) socket.leave(room);
  296. });
  297. socket.join(payload.room);
  298. });
  299. return resolve();
  300. });
  301. }
  302. // UNKNOWN
  303. // eslint-disable-next-line require-jsdoc
  304. SOCKETS_LEAVE_SONG_ROOMS(payload) {
  305. // sockets
  306. return new Promise(resolve => {
  307. Object.keys(payload.sockets).forEach(socketKey => {
  308. const socket = payload.sockets[socketKey];
  309. const { rooms } = socket;
  310. Object.keys(rooms).forEach(roomKey => {
  311. const room = rooms[roomKey];
  312. if (room.indexOf("song.") !== -1) socket.leave(room);
  313. });
  314. });
  315. resolve();
  316. });
  317. }
  318. /**
  319. * Emits arguments to any sockets that are in a specified a room
  320. *
  321. * @param {object} payload - object that contains the payload
  322. * @param {string} payload.room - the name of the room to emit arguments
  323. * @param {object} payload.args - any arguments to be emitted to the sockets in the specific room
  324. * @returns {Promise} - returns promise (reject, resolve)
  325. */
  326. async EMIT_TO_ROOM(payload) {
  327. return new Promise(resolve => {
  328. const { sockets } = IOModule._io.sockets;
  329. Object.keys(sockets).forEach(socketKey => {
  330. const socket = sockets[socketKey];
  331. if (socket.rooms[payload.room]) {
  332. socket.emit(...payload.args);
  333. }
  334. });
  335. return resolve();
  336. });
  337. }
  338. /**
  339. * Gets any sockets connected to a room
  340. *
  341. * @param {object} payload - object that contains the payload
  342. * @param {string} payload.room - the name of the room
  343. * @returns {Promise} - returns promise (reject, resolve)
  344. */
  345. async GET_ROOM_SOCKETS(payload) {
  346. return new Promise(resolve => {
  347. const { sockets } = IOModule._io.sockets;
  348. const roomSockets = [];
  349. Object.keys(sockets).forEach(socketKey => {
  350. const socket = sockets[socketKey];
  351. if (socket.rooms[payload.room]) roomSockets.push(socket);
  352. });
  353. return resolve(roomSockets);
  354. });
  355. }
  356. /**
  357. * Handles io.use
  358. *
  359. * @param {object} payload - object that contains the payload
  360. * @returns {Promise} - returns promise (reject, resolve)
  361. */
  362. async HANDLE_IO_USE(payload) {
  363. return new Promise(resolve => {
  364. const { socket, cb } = payload;
  365. let SID;
  366. socket.ip = socket.request.headers["x-forwarded-for"] || "0.0.0.0";
  367. return async.waterfall(
  368. [
  369. next => {
  370. if (!socket.request.headers.cookie) return next("No cookie exists yet.");
  371. return UtilsModule.runJob(
  372. "PARSE_COOKIES",
  373. {
  374. cookieString: socket.request.headers.cookie
  375. },
  376. this
  377. ).then(res => {
  378. SID = res[IOModule.SIDname];
  379. next(null);
  380. });
  381. },
  382. next => {
  383. if (!SID) return next("No SID.");
  384. return next();
  385. },
  386. next => {
  387. CacheModule.runJob("HGET", { table: "sessions", key: SID }, this)
  388. .then(session => {
  389. next(null, session);
  390. })
  391. .catch(next);
  392. },
  393. (session, next) => {
  394. if (!session) return next("No session found.");
  395. session.refreshDate = Date.now();
  396. socket.session = session;
  397. return CacheModule.runJob(
  398. "HSET",
  399. {
  400. table: "sessions",
  401. key: SID,
  402. value: session
  403. },
  404. this
  405. ).then(session => {
  406. next(null, session);
  407. });
  408. },
  409. (res, next) => {
  410. // check if a session's user / IP is banned
  411. PunishmentsModule.runJob("GET_PUNISHMENTS", {}, this)
  412. .then(punishments => {
  413. const isLoggedIn = !!(socket.session && socket.session.refreshDate);
  414. const userId = isLoggedIn ? socket.session.userId : null;
  415. const banishment = {
  416. banned: false,
  417. ban: 0
  418. };
  419. punishments.forEach(punishment => {
  420. if (punishment.expiresAt > banishment.ban) banishment.ban = punishment;
  421. if (punishment.type === "banUserId" && isLoggedIn && punishment.value === userId)
  422. banishment.banned = true;
  423. if (punishment.type === "banUserIp" && punishment.value === socket.ip)
  424. banishment.banned = true;
  425. });
  426. socket.banishment = banishment;
  427. next();
  428. })
  429. .catch(() => {
  430. next();
  431. });
  432. }
  433. ],
  434. () => {
  435. if (!socket.session) socket.session = { socketId: socket.id };
  436. else socket.session.socketId = socket.id;
  437. cb();
  438. resolve();
  439. }
  440. );
  441. });
  442. }
  443. /**
  444. * Handles io.connection
  445. *
  446. * @param {object} payload - object that contains the payload
  447. * @returns {Promise} - returns promise (reject, resolve)
  448. */
  449. async HANDLE_IO_CONNECTION(payload) {
  450. return new Promise(resolve => {
  451. const { socket } = payload;
  452. let sessionInfo = "";
  453. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  454. // if session is banned
  455. if (socket.banishment && socket.banishment.banned) {
  456. IOModule.log(
  457. "INFO",
  458. "IO_BANNED_CONNECTION",
  459. `A user tried to connect, but is currently banned. IP: ${socket.ip}.${sessionInfo}`
  460. );
  461. socket.emit("keep.event:banned", socket.banishment.ban);
  462. return socket.disconnect(true);
  463. }
  464. IOModule.log("INFO", "IO_CONNECTION", `User connected. IP: ${socket.ip}.${sessionInfo}`);
  465. // catch when the socket has been disconnected
  466. socket.on("disconnect", () => {
  467. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  468. IOModule.log("INFO", "IO_DISCONNECTION", `User disconnected. IP: ${socket.ip}.${sessionInfo}`);
  469. });
  470. socket.use((data, next) => {
  471. if (data.length === 0) return next(new Error("Not enough arguments specified."));
  472. if (typeof data[0] !== "string") return next(new Error("First argument must be a string."));
  473. const namespaceAction = data[0];
  474. if (
  475. !namespaceAction ||
  476. namespaceAction.indexOf(".") === -1 ||
  477. namespaceAction.indexOf(".") !== namespaceAction.lastIndexOf(".")
  478. )
  479. return next(new Error("Invalid first argument"));
  480. const namespace = data[0].split(".")[0];
  481. const action = data[0].split(".")[1];
  482. if (!namespace) return next(new Error("Invalid namespace."));
  483. if (!action) return next(new Error("Invalid action."));
  484. if (!IOModule.actions[namespace]) return next(new Error("Namespace not found."));
  485. if (!IOModule.actions[namespace][action]) return next(new Error("Action not found."));
  486. return next();
  487. });
  488. // catch errors on the socket (internal to socket.io)
  489. socket.on("error", console.error);
  490. if (socket.session.sessionId) {
  491. CacheModule.runJob("HGET", {
  492. table: "sessions",
  493. key: socket.session.sessionId
  494. })
  495. .then(session => {
  496. if (session && session.userId) {
  497. IOModule.userModel.findOne({ _id: session.userId }, (err, user) => {
  498. if (err || !user) return socket.emit("ready", false);
  499. let role = "";
  500. let username = "";
  501. let userId = "";
  502. if (user) {
  503. role = user.role;
  504. username = user.username;
  505. userId = session.userId;
  506. }
  507. return socket.emit("ready", true, role, username, userId);
  508. });
  509. } else socket.emit("ready", false);
  510. })
  511. .catch(() => {
  512. socket.emit("ready", false);
  513. });
  514. } else socket.emit("ready", false);
  515. // have the socket listen for each action
  516. Object.keys(IOModule.actions).forEach(namespace => {
  517. Object.keys(IOModule.actions[namespace]).forEach(action => {
  518. // the full name of the action
  519. const name = `${namespace}.${action}`;
  520. // listen for this action to be called
  521. socket.on(name, async (...args) => {
  522. IOModule.runJob("RUN_ACTION", { socket, namespace, action, args });
  523. /* let cb = args[args.length - 1];
  524. if (typeof cb !== "function")
  525. cb = () => {
  526. IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  527. };
  528. else args.pop();
  529. if (this.getStatus() !== "READY") {
  530. IOModule.log(
  531. "INFO",
  532. "IO_REJECTED_ACTION",
  533. `A user tried to execute an action, but the IO module is currently not ready. Action: ${namespace}.${action}.`
  534. );
  535. return;
  536. }
  537. IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  538. let failedGettingSession = false;
  539. // load the session from the cache
  540. if (socket.session.sessionId)
  541. await CacheModule.runJob("HGET", {
  542. table: "sessions",
  543. key: socket.session.sessionId
  544. })
  545. .then(session => {
  546. // make sure the sockets sessionId isn't set if there is no session
  547. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  548. })
  549. .catch(() => {
  550. failedGettingSession = true;
  551. if (typeof cb === "function")
  552. cb({
  553. status: "error",
  554. message: "An error occurred while obtaining your session"
  555. });
  556. });
  557. if (!failedGettingSession)
  558. try {
  559. // call the action, passing it the session, and the arguments socket.io passed us
  560. this.runJob("RUN_ACTION", { namespace, action, session: socket.session, args })
  561. .then(response => {
  562. if (typeof cb === "function") cb(response);
  563. })
  564. .catch(err => {
  565. if (typeof cb === "function") cb(err);
  566. });
  567. // actions[namespace][action].apply(
  568. // null,
  569. // [socket.session].concat(args).concat([
  570. // result => {
  571. // IOModule.log(
  572. // "INFO",
  573. // "IO_ACTION",
  574. // `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  575. // );
  576. // // respond to the socket with our message
  577. // if (typeof cb === "function") cb(result);
  578. // }
  579. // ])
  580. // );
  581. } catch (err) {
  582. if (typeof cb === "function")
  583. cb({
  584. status: "error",
  585. message: "An error occurred while executing the specified action."
  586. });
  587. IOModule.log(
  588. "ERROR",
  589. "IO_ACTION_ERROR",
  590. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  591. );
  592. } */
  593. });
  594. });
  595. });
  596. return resolve();
  597. });
  598. }
  599. /**
  600. * Runs an action
  601. *
  602. * @param {object} payload - object that contains the payload
  603. * @returns {Promise} - returns promise (reject, resolve)
  604. */
  605. async RUN_ACTION(payload) {
  606. return new Promise((resolve, reject) => {
  607. const { socket, namespace, action, args } = payload;
  608. // the full name of the action
  609. const name = `${namespace}.${action}`;
  610. let cb = args[args.length - 1];
  611. if (typeof cb !== "function")
  612. cb = () => {
  613. IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  614. };
  615. else args.pop();
  616. IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  617. // load the session from the cache
  618. new Promise(resolve => {
  619. if (socket.session.sessionId)
  620. CacheModule.runJob("HGET", {
  621. table: "sessions",
  622. key: socket.session.sessionId
  623. })
  624. .then(session => {
  625. // make sure the sockets sessionId isn't set if there is no session
  626. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  627. resolve();
  628. })
  629. .catch(() => {
  630. if (typeof cb === "function")
  631. cb({
  632. status: "error",
  633. message: "An error occurred while obtaining your session"
  634. });
  635. reject(new Error("An error occurred while obtaining the session"));
  636. });
  637. else resolve();
  638. })
  639. .then(() => {
  640. // call the job that calls the action, passing it the session, and the arguments socket.io passed us
  641. IOModule.runJob("RUN_ACTION2", { session: socket.session, namespace, action, args }, this)
  642. .then(response => {
  643. cb(response);
  644. resolve();
  645. })
  646. .catch(err => {
  647. if (typeof cb === "function")
  648. cb({
  649. status: "error",
  650. message: "An error occurred while executing the specified action."
  651. });
  652. reject(err);
  653. IOModule.log(
  654. "ERROR",
  655. "IO_ACTION_ERROR",
  656. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  657. );
  658. });
  659. })
  660. .catch(reject);
  661. });
  662. }
  663. /**
  664. * Runs an action
  665. *
  666. * @param {object} payload - object that contains the payload
  667. * @returns {Promise} - returns promise (reject, resolve)
  668. */
  669. async RUN_ACTION2(payload) {
  670. return new Promise((resolve, reject) => {
  671. const { session, namespace, action, args } = payload;
  672. try {
  673. // call the the action, passing it the session, and the arguments socket.io passed us
  674. IOModule.actions[namespace][action].apply(
  675. this,
  676. [session].concat(args).concat([
  677. result => {
  678. IOModule.log(
  679. "INFO",
  680. "RUN_ACTION2",
  681. `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  682. );
  683. resolve(result);
  684. }
  685. ])
  686. );
  687. } catch (err) {
  688. reject(err);
  689. IOModule.log(
  690. "ERROR",
  691. "IO_ACTION_ERROR",
  692. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  693. );
  694. }
  695. });
  696. }
  697. }
  698. export default new _IOModule();