io.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772
  1. /**
  2. * @file
  3. */
  4. import config from "config";
  5. import async from "async";
  6. import socketio from "socket.io";
  7. import CoreClass from "../core";
  8. let IOModule;
  9. let AppModule;
  10. let CacheModule;
  11. let UtilsModule;
  12. let DBModule;
  13. let PunishmentsModule;
  14. class _IOModule extends CoreClass {
  15. // eslint-disable-next-line require-jsdoc
  16. constructor() {
  17. super("io");
  18. IOModule = this;
  19. }
  20. /**
  21. * Initialises the io module
  22. *
  23. * @returns {Promise} - returns promise (reject, resolve)
  24. */
  25. async initialize() {
  26. this.setStage(1);
  27. AppModule = this.moduleManager.modules.app;
  28. CacheModule = this.moduleManager.modules.cache;
  29. UtilsModule = this.moduleManager.modules.utils;
  30. DBModule = this.moduleManager.modules.db;
  31. PunishmentsModule = this.moduleManager.modules.punishments;
  32. this.actions = (await import("./actions")).default;
  33. this.userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  34. this.setStage(2);
  35. this.SIDname = config.get("cookie.SIDname");
  36. // TODO: Check every 30s/, for all sockets, if they are still allowed to be in the rooms they are in, and on socket at all (permission changing/banning)
  37. const server = await AppModule.runJob("SERVER");
  38. this._io = socketio(server);
  39. return new Promise(resolve => {
  40. this.setStage(3);
  41. this._io.use(async (socket, cb) => {
  42. IOModule.runJob("HANDLE_IO_USE", { socket, cb });
  43. });
  44. this.setStage(4);
  45. this._io.on("connection", async socket => {
  46. IOModule.runJob("HANDLE_IO_CONNECTION", { socket });
  47. });
  48. this.setStage(5);
  49. return resolve();
  50. });
  51. }
  52. /**
  53. * Returns the socket io variable
  54. *
  55. * @returns {Promise} - returns a promise (resolve, reject)
  56. */
  57. IO() {
  58. return new Promise(resolve => {
  59. resolve(IOModule._io);
  60. });
  61. }
  62. /**
  63. * Returns whether there is a socket for a session id or not
  64. *
  65. * @param {object} payload - object containing the payload
  66. * @param {string} payload.sessionId - user session id
  67. * @returns {Promise} - returns promise (reject, resolve)
  68. */
  69. async SOCKET_FROM_SESSION(payload) {
  70. // socketId
  71. return new Promise((resolve, reject) => {
  72. const ns = IOModule._io.of("/");
  73. if (ns) {
  74. return resolve(ns.connected[payload.socketId]);
  75. }
  76. return reject();
  77. });
  78. }
  79. /**
  80. * Gets all sockets for a specified session id
  81. *
  82. * @param {object} payload - object containing the payload
  83. * @param {string} payload.sessionId - user session id
  84. * @returns {Promise} - returns promise (reject, resolve)
  85. */
  86. async SOCKETS_FROM_SESSION_ID(payload) {
  87. return new Promise(resolve => {
  88. const ns = IOModule._io.of("/");
  89. const sockets = [];
  90. if (ns) {
  91. return async.each(
  92. Object.keys(ns.connected),
  93. (id, next) => {
  94. const { session } = ns.connected[id];
  95. if (session.sessionId === payload.sessionId) sockets.push(session.sessionId);
  96. next();
  97. },
  98. () => {
  99. resolve({ sockets });
  100. }
  101. );
  102. }
  103. return resolve();
  104. });
  105. }
  106. /**
  107. * Returns any sockets for a specific user
  108. *
  109. * @param {object} payload - object that contains the payload
  110. * @param {string} payload.userId - the user id
  111. * @returns {Promise} - returns promise (reject, resolve)
  112. */
  113. async SOCKETS_FROM_USER(payload) {
  114. return new Promise((resolve, reject) => {
  115. const ns = IOModule._io.of("/");
  116. const sockets = [];
  117. if (ns) {
  118. return async.each(
  119. Object.keys(ns.connected),
  120. (id, next) => {
  121. const { session } = ns.connected[id];
  122. CacheModule.runJob(
  123. "HGET",
  124. {
  125. table: "sessions",
  126. key: session.sessionId
  127. },
  128. this
  129. )
  130. .then(session => {
  131. if (session && session.userId === payload.userId) sockets.push(ns.connected[id]);
  132. next();
  133. })
  134. .catch(err => {
  135. next(err);
  136. });
  137. },
  138. err => {
  139. if (err) return reject(err);
  140. return resolve({ sockets });
  141. }
  142. );
  143. }
  144. return resolve();
  145. });
  146. }
  147. /**
  148. * Returns any sockets from a specific ip address
  149. *
  150. * @param {object} payload - object that contains the payload
  151. * @param {string} payload.ip - the ip address in question
  152. * @returns {Promise} - returns promise (reject, resolve)
  153. */
  154. async SOCKETS_FROM_IP(payload) {
  155. return new Promise(resolve => {
  156. const ns = IOModule._io.of("/");
  157. const sockets = [];
  158. if (ns) {
  159. return async.each(
  160. Object.keys(ns.connected),
  161. (id, next) => {
  162. const { session } = ns.connected[id];
  163. CacheModule.runJob(
  164. "HGET",
  165. {
  166. table: "sessions",
  167. key: session.sessionId
  168. },
  169. this
  170. )
  171. .then(session => {
  172. if (session && ns.connected[id].ip === payload.ip) sockets.push(ns.connected[id]);
  173. next();
  174. })
  175. .catch(() => next());
  176. },
  177. () => {
  178. resolve({ sockets });
  179. }
  180. );
  181. }
  182. return resolve();
  183. });
  184. }
  185. /**
  186. * Returns any sockets from a specific user without using redis/cache
  187. *
  188. * @param {object} payload - object that contains the payload
  189. * @param {string} payload.userId - the id of the user in question
  190. * @returns {Promise} - returns promise (reject, resolve)
  191. */
  192. async SOCKETS_FROM_USER_WITHOUT_CACHE(payload) {
  193. return new Promise(resolve => {
  194. const ns = IOModule._io.of("/");
  195. const sockets = [];
  196. if (ns) {
  197. return async.each(
  198. Object.keys(ns.connected),
  199. (id, next) => {
  200. const { session } = ns.connected[id];
  201. if (session.userId === payload.userId) sockets.push(ns.connected[id]);
  202. next();
  203. },
  204. () => {
  205. resolve({ sockets });
  206. }
  207. );
  208. }
  209. return resolve();
  210. });
  211. }
  212. /**
  213. * Allows a socket to leave any rooms they are connected to
  214. *
  215. * @param {object} payload - object that contains the payload
  216. * @param {string} payload.socketId - the id of the socket which should leave all their rooms
  217. * @returns {Promise} - returns promise (reject, resolve)
  218. */
  219. async SOCKET_LEAVE_ROOMS(payload) {
  220. const socket = await IOModule.runJob(
  221. "SOCKET_FROM_SESSION",
  222. {
  223. socketId: payload.socketId
  224. },
  225. this
  226. );
  227. return new Promise(resolve => {
  228. const { rooms } = socket;
  229. Object.keys(rooms).forEach(roomKey => {
  230. const room = rooms[roomKey];
  231. socket.leave(room);
  232. });
  233. return resolve();
  234. });
  235. }
  236. /**
  237. * Allows a socket to join a specified room
  238. *
  239. * @param {object} payload - object that contains the payload
  240. * @param {string} payload.socketId - the id of the socket which should join the room
  241. * @param {object} payload.room - the object representing the room the socket should join
  242. * @returns {Promise} - returns promise (reject, resolve)
  243. */
  244. async SOCKET_JOIN_ROOM(payload) {
  245. const socket = await IOModule.runJob(
  246. "SOCKET_FROM_SESSION",
  247. {
  248. socketId: payload.socketId
  249. },
  250. this
  251. );
  252. return new Promise(resolve => {
  253. const { rooms } = socket;
  254. Object.keys(rooms).forEach(roomKey => {
  255. const room = rooms[roomKey];
  256. socket.leave(room);
  257. });
  258. socket.join(payload.room);
  259. return resolve();
  260. });
  261. }
  262. // UNKNOWN
  263. // eslint-disable-next-line require-jsdoc
  264. async SOCKET_JOIN_SONG_ROOM(payload) {
  265. // socketId, room
  266. const socket = await IOModule.runJob(
  267. "SOCKET_FROM_SESSION",
  268. {
  269. socketId: payload.socketId
  270. },
  271. this
  272. );
  273. return new Promise(resolve => {
  274. const { rooms } = socket;
  275. Object.keys(rooms).forEach(roomKey => {
  276. const room = rooms[roomKey];
  277. if (room.indexOf("song.") !== -1) socket.leave(room);
  278. });
  279. socket.join(payload.room);
  280. return resolve();
  281. });
  282. }
  283. // UNKNOWN
  284. // eslint-disable-next-line require-jsdoc
  285. SOCKETS_JOIN_SONG_ROOM(payload) {
  286. // sockets, room
  287. return new Promise(resolve => {
  288. Object.keys(payload.sockets).forEach(socketKey => {
  289. const socket = payload.sockets[socketKey];
  290. const { rooms } = socket;
  291. Object.keys(rooms).forEach(roomKey => {
  292. const room = rooms[roomKey];
  293. if (room.indexOf("song.") !== -1) socket.leave(room);
  294. });
  295. socket.join(payload.room);
  296. });
  297. return resolve();
  298. });
  299. }
  300. // UNKNOWN
  301. // eslint-disable-next-line require-jsdoc
  302. SOCKETS_LEAVE_SONG_ROOMS(payload) {
  303. // sockets
  304. return new Promise(resolve => {
  305. Object.keys(payload.sockets).forEach(socketKey => {
  306. const socket = payload.sockets[socketKey];
  307. const { rooms } = socket;
  308. Object.keys(rooms).forEach(roomKey => {
  309. const room = rooms[roomKey];
  310. if (room.indexOf("song.") !== -1) socket.leave(room);
  311. });
  312. });
  313. resolve();
  314. });
  315. }
  316. /**
  317. * Emits arguments to any sockets that are in a specified a room
  318. *
  319. * @param {object} payload - object that contains the payload
  320. * @param {string} payload.room - the name of the room to emit arguments
  321. * @param {object} payload.args - any arguments to be emitted to the sockets in the specific room
  322. * @returns {Promise} - returns promise (reject, resolve)
  323. */
  324. async EMIT_TO_ROOM(payload) {
  325. return new Promise(resolve => {
  326. const { sockets } = IOModule._io.sockets;
  327. Object.keys(sockets).forEach(socketKey => {
  328. const socket = sockets[socketKey];
  329. if (socket.rooms[payload.room]) {
  330. socket.emit(...payload.args);
  331. }
  332. });
  333. return resolve();
  334. });
  335. }
  336. /**
  337. * Gets any sockets connected to a room
  338. *
  339. * @param {object} payload - object that contains the payload
  340. * @param {string} payload.room - the name of the room
  341. * @returns {Promise} - returns promise (reject, resolve)
  342. */
  343. async GET_ROOM_SOCKETS(payload) {
  344. return new Promise(resolve => {
  345. const { sockets } = IOModule._io.sockets;
  346. const roomSockets = [];
  347. Object.keys(sockets).forEach(socketKey => {
  348. const socket = sockets[socketKey];
  349. if (socket.rooms[payload.room]) roomSockets.push(socket);
  350. });
  351. return resolve(roomSockets);
  352. });
  353. }
  354. /**
  355. * Handles io.use
  356. *
  357. * @param {object} payload - object that contains the payload
  358. * @returns {Promise} - returns promise (reject, resolve)
  359. */
  360. async HANDLE_IO_USE(payload) {
  361. return new Promise(resolve => {
  362. const { socket, cb } = payload;
  363. let SID;
  364. socket.ip = socket.request.headers["x-forwarded-for"] || "0.0.0.0";
  365. return async.waterfall(
  366. [
  367. next => {
  368. UtilsModule.runJob(
  369. "PARSE_COOKIES",
  370. {
  371. cookieString: socket.request.headers.cookie
  372. },
  373. this
  374. ).then(res => {
  375. SID = res[IOModule.SIDname];
  376. next(null);
  377. });
  378. },
  379. next => {
  380. if (!SID) return next("No SID.");
  381. return next();
  382. },
  383. next => {
  384. CacheModule.runJob("HGET", { table: "sessions", key: SID }, this)
  385. .then(session => {
  386. next(null, session);
  387. })
  388. .catch(next);
  389. },
  390. (session, next) => {
  391. if (!session) return next("No session found.");
  392. session.refreshDate = Date.now();
  393. socket.session = session;
  394. return CacheModule.runJob(
  395. "HSET",
  396. {
  397. table: "sessions",
  398. key: SID,
  399. value: session
  400. },
  401. this
  402. ).then(session => {
  403. next(null, session);
  404. });
  405. },
  406. (res, next) => {
  407. // check if a session's user / IP is banned
  408. PunishmentsModule.runJob("GET_PUNISHMENTS", {}, this)
  409. .then(punishments => {
  410. const isLoggedIn = !!(socket.session && socket.session.refreshDate);
  411. const userId = isLoggedIn ? socket.session.userId : null;
  412. const banishment = {
  413. banned: false,
  414. ban: 0
  415. };
  416. punishments.forEach(punishment => {
  417. if (punishment.expiresAt > banishment.ban) banishment.ban = punishment;
  418. if (punishment.type === "banUserId" && isLoggedIn && punishment.value === userId)
  419. banishment.banned = true;
  420. if (punishment.type === "banUserIp" && punishment.value === socket.ip)
  421. banishment.banned = true;
  422. });
  423. socket.banishment = banishment;
  424. next();
  425. })
  426. .catch(() => {
  427. next();
  428. });
  429. }
  430. ],
  431. () => {
  432. if (!socket.session) socket.session = { socketId: socket.id };
  433. else socket.session.socketId = socket.id;
  434. cb();
  435. resolve();
  436. }
  437. );
  438. });
  439. }
  440. /**
  441. * Handles io.connection
  442. *
  443. * @param {object} payload - object that contains the payload
  444. * @returns {Promise} - returns promise (reject, resolve)
  445. */
  446. async HANDLE_IO_CONNECTION(payload) {
  447. return new Promise(resolve => {
  448. const { socket } = payload;
  449. let sessionInfo = "";
  450. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  451. // if session is banned
  452. if (socket.banishment && socket.banishment.banned) {
  453. IOModule.log(
  454. "INFO",
  455. "IO_BANNED_CONNECTION",
  456. `A user tried to connect, but is currently banned. IP: ${socket.ip}.${sessionInfo}`
  457. );
  458. socket.emit("keep.event:banned", socket.banishment.ban);
  459. return socket.disconnect(true);
  460. }
  461. IOModule.log("INFO", "IO_CONNECTION", `User connected. IP: ${socket.ip}.${sessionInfo}`);
  462. // catch when the socket has been disconnected
  463. socket.on("disconnect", () => {
  464. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  465. IOModule.log("INFO", "IO_DISCONNECTION", `User disconnected. IP: ${socket.ip}.${sessionInfo}`);
  466. });
  467. socket.use((data, next) => {
  468. if (data.length === 0) return next(new Error("Not enough arguments specified."));
  469. if (typeof data[0] !== "string") return next(new Error("First argument must be a string."));
  470. const namespaceAction = data[0];
  471. if (
  472. !namespaceAction ||
  473. namespaceAction.indexOf(".") === -1 ||
  474. namespaceAction.indexOf(".") !== namespaceAction.lastIndexOf(".")
  475. )
  476. return next(new Error("Invalid first argument"));
  477. const namespace = data[0].split(".")[0];
  478. const action = data[0].split(".")[1];
  479. if (!namespace) return next(new Error("Invalid namespace."));
  480. if (!action) return next(new Error("Invalid action."));
  481. if (!IOModule.actions[namespace]) return next(new Error("Namespace not found."));
  482. if (!IOModule.actions[namespace][action]) return next(new Error("Action not found."));
  483. return next();
  484. });
  485. // catch errors on the socket (internal to socket.io)
  486. socket.on("error", console.error);
  487. if (socket.session.sessionId) {
  488. CacheModule.runJob("HGET", {
  489. table: "sessions",
  490. key: socket.session.sessionId
  491. })
  492. .then(session => {
  493. if (session && session.userId) {
  494. IOModule.userModel.findOne({ _id: session.userId }, (err, user) => {
  495. if (err || !user) return socket.emit("ready", false);
  496. let role = "";
  497. let username = "";
  498. let userId = "";
  499. if (user) {
  500. role = user.role;
  501. username = user.username;
  502. userId = session.userId;
  503. }
  504. return socket.emit("ready", true, role, username, userId);
  505. });
  506. } else socket.emit("ready", false);
  507. })
  508. .catch(() => {
  509. socket.emit("ready", false);
  510. });
  511. } else socket.emit("ready", false);
  512. // have the socket listen for each action
  513. Object.keys(IOModule.actions).forEach(namespace => {
  514. Object.keys(IOModule.actions[namespace]).forEach(action => {
  515. // the full name of the action
  516. const name = `${namespace}.${action}`;
  517. // listen for this action to be called
  518. socket.on(name, async (...args) => {
  519. IOModule.runJob("RUN_ACTION", { socket, namespace, action, args });
  520. /* let cb = args[args.length - 1];
  521. if (typeof cb !== "function")
  522. cb = () => {
  523. IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  524. };
  525. else args.pop();
  526. if (this.getStatus() !== "READY") {
  527. IOModule.log(
  528. "INFO",
  529. "IO_REJECTED_ACTION",
  530. `A user tried to execute an action, but the IO module is currently not ready. Action: ${namespace}.${action}.`
  531. );
  532. return;
  533. }
  534. IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  535. let failedGettingSession = false;
  536. // load the session from the cache
  537. if (socket.session.sessionId)
  538. await CacheModule.runJob("HGET", {
  539. table: "sessions",
  540. key: socket.session.sessionId
  541. })
  542. .then(session => {
  543. // make sure the sockets sessionId isn't set if there is no session
  544. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  545. })
  546. .catch(() => {
  547. failedGettingSession = true;
  548. if (typeof cb === "function")
  549. cb({
  550. status: "error",
  551. message: "An error occurred while obtaining your session"
  552. });
  553. });
  554. if (!failedGettingSession)
  555. try {
  556. // call the action, passing it the session, and the arguments socket.io passed us
  557. this.runJob("RUN_ACTION", { namespace, action, session: socket.session, args })
  558. .then(response => {
  559. if (typeof cb === "function") cb(response);
  560. })
  561. .catch(err => {
  562. if (typeof cb === "function") cb(err);
  563. });
  564. // actions[namespace][action].apply(
  565. // null,
  566. // [socket.session].concat(args).concat([
  567. // result => {
  568. // IOModule.log(
  569. // "INFO",
  570. // "IO_ACTION",
  571. // `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  572. // );
  573. // // respond to the socket with our message
  574. // if (typeof cb === "function") cb(result);
  575. // }
  576. // ])
  577. // );
  578. } catch (err) {
  579. if (typeof cb === "function")
  580. cb({
  581. status: "error",
  582. message: "An error occurred while executing the specified action."
  583. });
  584. IOModule.log(
  585. "ERROR",
  586. "IO_ACTION_ERROR",
  587. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  588. );
  589. } */
  590. });
  591. });
  592. });
  593. return resolve();
  594. });
  595. }
  596. /**
  597. * Runs an action
  598. *
  599. * @param {object} payload - object that contains the payload
  600. * @returns {Promise} - returns promise (reject, resolve)
  601. */
  602. async RUN_ACTION(payload) {
  603. return new Promise((resolve, reject) => {
  604. const { socket, namespace, action, args } = payload;
  605. // the full name of the action
  606. const name = `${namespace}.${action}`;
  607. let cb = args[args.length - 1];
  608. if (typeof cb !== "function")
  609. cb = () => {
  610. IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  611. };
  612. else args.pop();
  613. IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  614. // load the session from the cache
  615. new Promise(resolve => {
  616. if (socket.session.sessionId)
  617. CacheModule.runJob("HGET", {
  618. table: "sessions",
  619. key: socket.session.sessionId
  620. })
  621. .then(session => {
  622. // make sure the sockets sessionId isn't set if there is no session
  623. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  624. resolve();
  625. })
  626. .catch(() => {
  627. if (typeof cb === "function")
  628. cb({
  629. status: "error",
  630. message: "An error occurred while obtaining your session"
  631. });
  632. reject(new Error("An error occurred while obtaining the session"));
  633. });
  634. })
  635. .then(() => {
  636. try {
  637. // call the action, passing it the session, and the arguments socket.io passed us
  638. IOModule.actions[namespace][action].apply(
  639. this,
  640. [socket.session].concat(args).concat([
  641. result => {
  642. IOModule.log(
  643. "INFO",
  644. "RUN_ACTION",
  645. `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  646. );
  647. // respond to the socket with our message
  648. if (typeof cb === "function") cb(result);
  649. resolve();
  650. }
  651. ])
  652. );
  653. } catch (err) {
  654. if (typeof cb === "function")
  655. cb({
  656. status: "error",
  657. message: "An error occurred while executing the specified action."
  658. });
  659. reject(err);
  660. IOModule.log(
  661. "ERROR",
  662. "IO_ACTION_ERROR",
  663. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  664. );
  665. }
  666. })
  667. .catch(reject);
  668. });
  669. }
  670. }
  671. export default new _IOModule();