io.js 21 KB


  1. /**
  2. * @file
  3. */
  4. import config from "config";
  5. import async from "async";
  6. import socketio from "socket.io";
  7. import CoreClass from "../core";
  8. let IOModule;
  9. let AppModule;
  10. let CacheModule;
  11. let UtilsModule;
  12. let DBModule;
  13. let PunishmentsModule;
  14. class _IOModule extends CoreClass {
  15. // eslint-disable-next-line require-jsdoc
  16. constructor() {
  17. super("io");
  18. IOModule = this;
  19. }
  20. /**
  21. * Initialises the io module
  22. *
  23. * @returns {Promise} - returns promise (reject, resolve)
  24. */
  25. async initialize() {
  26. this.setStage(1);
  27. AppModule = this.moduleManager.modules.app;
  28. CacheModule = this.moduleManager.modules.cache;
  29. UtilsModule = this.moduleManager.modules.utils;
  30. DBModule = this.moduleManager.modules.db;
  31. PunishmentsModule = this.moduleManager.modules.punishments;
  32. this.actions = (await import("./actions")).default;
  33. this.userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  34. this.setStage(2);
  35. this.SIDname = config.get("cookie.SIDname");
  36. // TODO: Check every 30s/, for all sockets, if they are still allowed to be in the rooms they are in, and on socket at all (permission changing/banning)
  37. const server = await AppModule.runJob("SERVER");
  38. this._io = socketio(server);
  39. return new Promise(resolve => {
  40. this.setStage(3);
  41. this._io.use(async (socket, cb) => {
  42. IOModule.runJob("HANDLE_IO_USE", { socket, cb });
  43. });
  44. this.setStage(4);
  45. this._io.on("connection", async socket => {
  46. IOModule.runJob("HANDLE_IO_CONNECTION", { socket });
  47. });
  48. this.setStage(5);
  49. return resolve();
  50. });
  51. }
  52. /**
  53. * Returns the socket io variable
  54. *
  55. * @returns {Promise} - returns a promise (resolve, reject)
  56. */
  57. IO() {
  58. return new Promise(resolve => {
  59. resolve(IOModule._io);
  60. });
  61. }
  62. /**
  63. * Returns whether there is a socket for a session id or not
  64. *
  65. * @param {object} payload - object containing the payload
  66. * @param {string} payload.sessionId - user session id
  67. * @returns {Promise} - returns promise (reject, resolve)
  68. */
  69. async SOCKET_FROM_SESSION(payload) {
  70. // socketId
  71. return new Promise((resolve, reject) => {
  72. const ns = IOModule._io.of("/");
  73. if (ns) {
  74. return resolve(ns.connected[payload.socketId]);
  75. }
  76. return reject();
  77. });
  78. }
  79. /**
  80. * Gets all sockets for a specified session id
  81. *
  82. * @param {object} payload - object containing the payload
  83. * @param {string} payload.sessionId - user session id
  84. * @returns {Promise} - returns promise (reject, resolve)
  85. */
  86. async SOCKETS_FROM_SESSION_ID(payload) {
  87. return new Promise(resolve => {
  88. const ns = IOModule._io.of("/");
  89. const sockets = [];
  90. if (ns) {
  91. return async.each(
  92. Object.keys(ns.connected),
  93. (id, next) => {
  94. const { session } = ns.connected[id];
  95. if (session.sessionId === payload.sessionId) sockets.push(session.sessionId);
  96. next();
  97. },
  98. () => {
  99. resolve({ sockets });
  100. }
  101. );
  102. }
  103. return resolve();
  104. });
  105. }
  106. /**
  107. * Returns any sockets for a specific user
  108. *
  109. * @param {object} payload - object that contains the payload
  110. * @param {string} payload.userId - the user id
  111. * @returns {Promise} - returns promise (reject, resolve)
  112. */
  113. async SOCKETS_FROM_USER(payload) {
  114. return new Promise((resolve, reject) => {
  115. const ns = IOModule._io.of("/");
  116. const sockets = [];
  117. if (ns) {
  118. return async.eachLimit(
  119. Object.keys(ns.connected),
  120. 1,
  121. (id, next) => {
  122. const { session } = ns.connected[id];
  123. CacheModule.runJob(
  124. "HGET",
  125. {
  126. table: "sessions",
  127. key: session.sessionId
  128. },
  129. this
  130. )
  131. .then(session => {
  132. if (session && session.userId === payload.userId) sockets.push(ns.connected[id]);
  133. next();
  134. })
  135. .catch(err => {
  136. next(err);
  137. });
  138. },
  139. err => {
  140. if (err) return reject(err);
  141. return resolve({ sockets });
  142. }
  143. );
  144. }
  145. return resolve();
  146. });
  147. }
  148. /**
  149. * Returns any sockets from a specific ip address
  150. *
  151. * @param {object} payload - object that contains the payload
  152. * @param {string} payload.ip - the ip address in question
  153. * @returns {Promise} - returns promise (reject, resolve)
  154. */
  155. async SOCKETS_FROM_IP(payload) {
  156. return new Promise(resolve => {
  157. const ns = IOModule._io.of("/");
  158. const sockets = [];
  159. if (ns) {
  160. return async.each(
  161. Object.keys(ns.connected),
  162. (id, next) => {
  163. const { session } = ns.connected[id];
  164. CacheModule.runJob(
  165. "HGET",
  166. {
  167. table: "sessions",
  168. key: session.sessionId
  169. },
  170. this
  171. )
  172. .then(session => {
  173. if (session && ns.connected[id].ip === payload.ip) sockets.push(ns.connected[id]);
  174. next();
  175. })
  176. .catch(() => next());
  177. },
  178. () => {
  179. resolve({ sockets });
  180. }
  181. );
  182. }
  183. return resolve();
  184. });
  185. }
  186. /**
  187. * Returns any sockets from a specific user without using redis/cache
  188. *
  189. * @param {object} payload - object that contains the payload
  190. * @param {string} payload.userId - the id of the user in question
  191. * @returns {Promise} - returns promise (reject, resolve)
  192. */
  193. async SOCKETS_FROM_USER_WITHOUT_CACHE(payload) {
  194. return new Promise(resolve => {
  195. const ns = IOModule._io.of("/");
  196. const sockets = [];
  197. if (ns) {
  198. return async.each(
  199. Object.keys(ns.connected),
  200. (id, next) => {
  201. const { session } = ns.connected[id];
  202. if (session.userId === payload.userId) sockets.push(ns.connected[id]);
  203. next();
  204. },
  205. () => {
  206. resolve({ sockets });
  207. }
  208. );
  209. }
  210. return resolve();
  211. });
  212. }
  213. /**
  214. * Allows a socket to leave any rooms they are connected to
  215. *
  216. * @param {object} payload - object that contains the payload
  217. * @param {string} payload.socketId - the id of the socket which should leave all their rooms
  218. * @returns {Promise} - returns promise (reject, resolve)
  219. */
  220. async SOCKET_LEAVE_ROOMS(payload) {
  221. const socket = await IOModule.runJob(
  222. "SOCKET_FROM_SESSION",
  223. {
  224. socketId: payload.socketId
  225. },
  226. this
  227. );
  228. return new Promise(resolve => {
  229. const { rooms } = socket;
  230. Object.keys(rooms).forEach(roomKey => {
  231. const room = rooms[roomKey];
  232. socket.leave(room);
  233. });
  234. return resolve();
  235. });
  236. }
  237. /**
  238. * Allows a socket to join a specified room
  239. *
  240. * @param {object} payload - object that contains the payload
  241. * @param {string} payload.socketId - the id of the socket which should join the room
  242. * @param {object} payload.room - the object representing the room the socket should join
  243. * @returns {Promise} - returns promise (reject, resolve)
  244. */
  245. async SOCKET_JOIN_ROOM(payload) {
  246. const socket = await IOModule.runJob(
  247. "SOCKET_FROM_SESSION",
  248. {
  249. socketId: payload.socketId
  250. },
  251. this
  252. );
  253. return new Promise(resolve => {
  254. const { rooms } = socket;
  255. Object.keys(rooms).forEach(roomKey => {
  256. const room = rooms[roomKey];
  257. socket.leave(room);
  258. });
  259. socket.join(payload.room);
  260. return resolve();
  261. });
  262. }
  263. // UNKNOWN
  264. // eslint-disable-next-line require-jsdoc
  265. async SOCKET_JOIN_SONG_ROOM(payload) {
  266. // socketId, room
  267. const socket = await IOModule.runJob(
  268. "SOCKET_FROM_SESSION",
  269. {
  270. socketId: payload.socketId
  271. },
  272. this
  273. );
  274. return new Promise(resolve => {
  275. const { rooms } = socket;
  276. Object.keys(rooms).forEach(roomKey => {
  277. const room = rooms[roomKey];
  278. if (room.indexOf("song.") !== -1) socket.leave(room);
  279. });
  280. socket.join(payload.room);
  281. return resolve();
  282. });
  283. }
  284. // UNKNOWN
  285. // eslint-disable-next-line require-jsdoc
  286. SOCKETS_JOIN_SONG_ROOM(payload) {
  287. // sockets, room
  288. return new Promise(resolve => {
  289. Object.keys(payload.sockets).forEach(socketKey => {
  290. const socket = payload.sockets[socketKey];
  291. const { rooms } = socket;
  292. Object.keys(rooms).forEach(roomKey => {
  293. const room = rooms[roomKey];
  294. if (room.indexOf("song.") !== -1) socket.leave(room);
  295. });
  296. socket.join(payload.room);
  297. });
  298. return resolve();
  299. });
  300. }
  301. // UNKNOWN
  302. // eslint-disable-next-line require-jsdoc
  303. SOCKETS_LEAVE_SONG_ROOMS(payload) {
  304. // sockets
  305. return new Promise(resolve => {
  306. Object.keys(payload.sockets).forEach(socketKey => {
  307. const socket = payload.sockets[socketKey];
  308. const { rooms } = socket;
  309. Object.keys(rooms).forEach(roomKey => {
  310. const room = rooms[roomKey];
  311. if (room.indexOf("song.") !== -1) socket.leave(room);
  312. });
  313. });
  314. resolve();
  315. });
  316. }
  317. /**
  318. * Emits arguments to any sockets that are in a specified a room
  319. *
  320. * @param {object} payload - object that contains the payload
  321. * @param {string} payload.room - the name of the room to emit arguments
  322. * @param {object} payload.args - any arguments to be emitted to the sockets in the specific room
  323. * @returns {Promise} - returns promise (reject, resolve)
  324. */
  325. async EMIT_TO_ROOM(payload) {
  326. return new Promise(resolve => {
  327. const { sockets } = IOModule._io.sockets;
  328. Object.keys(sockets).forEach(socketKey => {
  329. const socket = sockets[socketKey];
  330. if (socket.rooms[payload.room]) {
  331. socket.emit(...payload.args);
  332. }
  333. });
  334. return resolve();
  335. });
  336. }
  337. /**
  338. * Gets any sockets connected to a room
  339. *
  340. * @param {object} payload - object that contains the payload
  341. * @param {string} payload.room - the name of the room
  342. * @returns {Promise} - returns promise (reject, resolve)
  343. */
  344. async GET_ROOM_SOCKETS(payload) {
  345. return new Promise(resolve => {
  346. const { sockets } = IOModule._io.sockets;
  347. const roomSockets = [];
  348. Object.keys(sockets).forEach(socketKey => {
  349. const socket = sockets[socketKey];
  350. if (socket.rooms[payload.room]) roomSockets.push(socket);
  351. });
  352. return resolve(roomSockets);
  353. });
  354. }
  355. /**
  356. * Handles io.use
  357. *
  358. * @param {object} payload - object that contains the payload
  359. * @returns {Promise} - returns promise (reject, resolve)
  360. */
  361. async HANDLE_IO_USE(payload) {
  362. return new Promise(resolve => {
  363. const { socket, cb } = payload;
  364. let SID;
  365. socket.ip = socket.request.headers["x-forwarded-for"] || "0.0.0.0";
  366. return async.waterfall(
  367. [
  368. next => {
  369. if (!socket.request.headers.cookie) return next("No cookie exists yet.");
  370. return UtilsModule.runJob(
  371. "PARSE_COOKIES",
  372. {
  373. cookieString: socket.request.headers.cookie
  374. },
  375. this
  376. ).then(res => {
  377. SID = res[IOModule.SIDname];
  378. next(null);
  379. });
  380. },
  381. next => {
  382. if (!SID) return next("No SID.");
  383. return next();
  384. },
  385. next => {
  386. CacheModule.runJob("HGET", { table: "sessions", key: SID }, this)
  387. .then(session => {
  388. next(null, session);
  389. })
  390. .catch(next);
  391. },
  392. (session, next) => {
  393. if (!session) return next("No session found.");
  394. session.refreshDate = Date.now();
  395. socket.session = session;
  396. return CacheModule.runJob(
  397. "HSET",
  398. {
  399. table: "sessions",
  400. key: SID,
  401. value: session
  402. },
  403. this
  404. ).then(session => {
  405. next(null, session);
  406. });
  407. },
  408. (res, next) => {
  409. // check if a session's user / IP is banned
  410. PunishmentsModule.runJob("GET_PUNISHMENTS", {}, this)
  411. .then(punishments => {
  412. const isLoggedIn = !!(socket.session && socket.session.refreshDate);
  413. const userId = isLoggedIn ? socket.session.userId : null;
  414. const banishment = {
  415. banned: false,
  416. ban: 0
  417. };
  418. punishments.forEach(punishment => {
  419. if (punishment.expiresAt > banishment.ban) banishment.ban = punishment;
  420. if (punishment.type === "banUserId" && isLoggedIn && punishment.value === userId)
  421. banishment.banned = true;
  422. if (punishment.type === "banUserIp" && punishment.value === socket.ip)
  423. banishment.banned = true;
  424. });
  425. socket.banishment = banishment;
  426. next();
  427. })
  428. .catch(() => {
  429. next();
  430. });
  431. }
  432. ],
  433. () => {
  434. if (!socket.session) socket.session = { socketId: socket.id };
  435. else socket.session.socketId = socket.id;
  436. cb();
  437. resolve();
  438. }
  439. );
  440. });
  441. }
  442. /**
  443. * Handles io.connection
  444. *
  445. * @param {object} payload - object that contains the payload
  446. * @returns {Promise} - returns promise (reject, resolve)
  447. */
  448. async HANDLE_IO_CONNECTION(payload) {
  449. return new Promise(resolve => {
  450. const { socket } = payload;
  451. let sessionInfo = "";
  452. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  453. // if session is banned
  454. if (socket.banishment && socket.banishment.banned) {
  455. IOModule.log(
  456. "INFO",
  457. "IO_BANNED_CONNECTION",
  458. `A user tried to connect, but is currently banned. IP: ${socket.ip}.${sessionInfo}`
  459. );
  460. socket.emit("keep.event:banned", socket.banishment.ban);
  461. return socket.disconnect(true);
  462. }
  463. IOModule.log("INFO", "IO_CONNECTION", `User connected. IP: ${socket.ip}.${sessionInfo}`);
  464. // catch when the socket has been disconnected
  465. socket.on("disconnect", () => {
  466. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  467. IOModule.log("INFO", "IO_DISCONNECTION", `User disconnected. IP: ${socket.ip}.${sessionInfo}`);
  468. });
  469. socket.use((data, next) => {
  470. if (data.length === 0) return next(new Error("Not enough arguments specified."));
  471. if (typeof data[0] !== "string") return next(new Error("First argument must be a string."));
  472. const namespaceAction = data[0];
  473. if (
  474. !namespaceAction ||
  475. namespaceAction.indexOf(".") === -1 ||
  476. namespaceAction.indexOf(".") !== namespaceAction.lastIndexOf(".")
  477. )
  478. return next(new Error("Invalid first argument"));
  479. const namespace = data[0].split(".")[0];
  480. const action = data[0].split(".")[1];
  481. if (!namespace) return next(new Error("Invalid namespace."));
  482. if (!action) return next(new Error("Invalid action."));
  483. if (!IOModule.actions[namespace]) return next(new Error("Namespace not found."));
  484. if (!IOModule.actions[namespace][action]) return next(new Error("Action not found."));
  485. return next();
  486. });
  487. // catch errors on the socket (internal to socket.io)
  488. socket.on("error", console.error);
  489. if (socket.session.sessionId) {
  490. CacheModule.runJob("HGET", {
  491. table: "sessions",
  492. key: socket.session.sessionId
  493. })
  494. .then(session => {
  495. if (session && session.userId) {
  496. IOModule.userModel.findOne({ _id: session.userId }, (err, user) => {
  497. if (err || !user) return socket.emit("ready", false);
  498. let role = "";
  499. let username = "";
  500. let userId = "";
  501. if (user) {
  502. role = user.role;
  503. username = user.username;
  504. userId = session.userId;
  505. }
  506. return socket.emit("ready", true, role, username, userId);
  507. });
  508. } else socket.emit("ready", false);
  509. })
  510. .catch(() => {
  511. socket.emit("ready", false);
  512. });
  513. } else socket.emit("ready", false);
  514. // have the socket listen for each action
  515. Object.keys(IOModule.actions).forEach(namespace => {
  516. Object.keys(IOModule.actions[namespace]).forEach(action => {
  517. // the full name of the action
  518. const name = `${namespace}.${action}`;
  519. // listen for this action to be called
  520. socket.on(name, async (...args) => {
  521. IOModule.runJob("RUN_ACTION", { socket, namespace, action, args });
  522. /* let cb = args[args.length - 1];
  523. if (typeof cb !== "function")
  524. cb = () => {
  525. IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  526. };
  527. else args.pop();
  528. if (this.getStatus() !== "READY") {
  529. IOModule.log(
  530. "INFO",
  531. "IO_REJECTED_ACTION",
  532. `A user tried to execute an action, but the IO module is currently not ready. Action: ${namespace}.${action}.`
  533. );
  534. return;
  535. }
  536. IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  537. let failedGettingSession = false;
  538. // load the session from the cache
  539. if (socket.session.sessionId)
  540. await CacheModule.runJob("HGET", {
  541. table: "sessions",
  542. key: socket.session.sessionId
  543. })
  544. .then(session => {
  545. // make sure the sockets sessionId isn't set if there is no session
  546. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  547. })
  548. .catch(() => {
  549. failedGettingSession = true;
  550. if (typeof cb === "function")
  551. cb({
  552. status: "error",
  553. message: "An error occurred while obtaining your session"
  554. });
  555. });
  556. if (!failedGettingSession)
  557. try {
  558. // call the action, passing it the session, and the arguments socket.io passed us
  559. this.runJob("RUN_ACTION", { namespace, action, session: socket.session, args })
  560. .then(response => {
  561. if (typeof cb === "function") cb(response);
  562. })
  563. .catch(err => {
  564. if (typeof cb === "function") cb(err);
  565. });
  566. // actions[namespace][action].apply(
  567. // null,
  568. // [socket.session].concat(args).concat([
  569. // result => {
  570. // IOModule.log(
  571. // "INFO",
  572. // "IO_ACTION",
  573. // `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  574. // );
  575. // // respond to the socket with our message
  576. // if (typeof cb === "function") cb(result);
  577. // }
  578. // ])
  579. // );
  580. } catch (err) {
  581. if (typeof cb === "function")
  582. cb({
  583. status: "error",
  584. message: "An error occurred while executing the specified action."
  585. });
  586. IOModule.log(
  587. "ERROR",
  588. "IO_ACTION_ERROR",
  589. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  590. );
  591. } */
  592. });
  593. });
  594. });
  595. return resolve();
  596. });
  597. }
  598. /**
  599. * Runs an action
  600. *
  601. * @param {object} payload - object that contains the payload
  602. * @returns {Promise} - returns promise (reject, resolve)
  603. */
  604. async RUN_ACTION(payload) {
  605. return new Promise((resolve, reject) => {
  606. const { socket, namespace, action, args } = payload;
  607. // the full name of the action
  608. const name = `${namespace}.${action}`;
  609. let cb = args[args.length - 1];
  610. if (typeof cb !== "function")
  611. cb = () => {
  612. IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  613. };
  614. else args.pop();
  615. IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  616. // load the session from the cache
  617. new Promise(resolve => {
  618. if (socket.session.sessionId)
  619. CacheModule.runJob("HGET", {
  620. table: "sessions",
  621. key: socket.session.sessionId
  622. })
  623. .then(session => {
  624. // make sure the sockets sessionId isn't set if there is no session
  625. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  626. resolve();
  627. })
  628. .catch(() => {
  629. if (typeof cb === "function")
  630. cb({
  631. status: "error",
  632. message: "An error occurred while obtaining your session"
  633. });
  634. reject(new Error("An error occurred while obtaining the session"));
  635. });
  636. else resolve();
  637. })
  638. .then(() => {
  639. // call the job that calls the action, passing it the session, and the arguments socket.io passed us
  640. IOModule.runJob("RUN_ACTION2", { session: socket.session, namespace, action, args }, this)
  641. .then(response => {
  642. cb(response);
  643. resolve();
  644. })
  645. .catch(err => {
  646. if (typeof cb === "function")
  647. cb({
  648. status: "error",
  649. message: "An error occurred while executing the specified action."
  650. });
  651. reject(err);
  652. IOModule.log(
  653. "ERROR",
  654. "IO_ACTION_ERROR",
  655. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  656. );
  657. });
  658. })
  659. .catch(reject);
  660. });
  661. }
  662. /**
  663. * Runs an action
  664. *
  665. * @param {object} payload - object that contains the payload
  666. * @returns {Promise} - returns promise (reject, resolve)
  667. */
  668. async RUN_ACTION2(payload) {
  669. return new Promise((resolve, reject) => {
  670. const { session, namespace, action, args } = payload;
  671. try {
  672. // call the the action, passing it the session, and the arguments socket.io passed us
  673. IOModule.actions[namespace][action].apply(
  674. this,
  675. [session].concat(args).concat([
  676. result => {
  677. IOModule.log(
  678. "INFO",
  679. "RUN_ACTION2",
  680. `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  681. );
  682. resolve(result);
  683. }
  684. ])
  685. );
  686. } catch (err) {
  687. reject(err);
  688. IOModule.log(
  689. "ERROR",
  690. "IO_ACTION_ERROR",
  691. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  692. );
  693. }
  694. });
  695. }
  696. }
  697. export default new _IOModule();