io.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. /**
  2. * @file
  3. */
  4. import config from "config";
  5. import async from "async";
  6. import WebSocket from "ws";
  7. import { EventEmitter } from "events";
  8. import CoreClass from "../core";
  9. let IOModule;
  10. let AppModule;
  11. let CacheModule;
  12. let UtilsModule;
  13. let DBModule;
  14. let PunishmentsModule;
  15. class _IOModule extends CoreClass {
  16. // eslint-disable-next-line require-jsdoc
  17. constructor() {
  18. super("io");
  19. IOModule = this;
  20. }
  21. /**
  22. * Initialises the io module
  23. *
  24. * @returns {Promise} - returns promise (reject, resolve)
  25. */
  26. async initialize() {
  27. this.setStage(1);
  28. AppModule = this.moduleManager.modules.app;
  29. CacheModule = this.moduleManager.modules.cache;
  30. UtilsModule = this.moduleManager.modules.utils;
  31. DBModule = this.moduleManager.modules.db;
  32. PunishmentsModule = this.moduleManager.modules.punishments;
  33. this.actions = (await import("./actions")).default;
  34. this.userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  35. this.setStage(2);
  36. this.SIDname = config.get("cookie.SIDname");
  37. // TODO: Check every 30s/, for all sockets, if they are still allowed to be in the rooms they are in, and on socket at all (permission changing/banning)
  38. const server = await AppModule.runJob("SERVER");
  39. // this._io.origins(config.get("cors.origin"));
  40. this._io = new WebSocket.Server({ server, path: "/ws" });
  41. this.rooms = {};
  42. return new Promise(resolve => {
  43. this.setStage(3);
  44. this.setStage(4);
  45. this._io.on("connection", async (socket, req) => {
  46. socket.dispatch = (...args) => socket.send(JSON.stringify(args));
  47. console.log(socket.actions);
  48. socket.actions = new EventEmitter();
  49. socket.actions.setMaxListeners(0);
  50. socket.listen = (target, cb) => socket.actions.addListener(target, args => cb(args));
  51. IOModule.runJob("HANDLE_IO_USE", { socket, req }).then(socket =>
  52. IOModule.runJob("HANDLE_IO_CONNECTION", { socket })
  53. );
  54. });
  55. this.setStage(5);
  56. return resolve();
  57. });
  58. }
  59. /**
  60. * Returns the socket io variable
  61. *
  62. * @returns {Promise} - returns a promise (resolve, reject)
  63. */
  64. IO() {
  65. return new Promise(resolve => {
  66. resolve(IOModule._io);
  67. });
  68. }
  69. /**
  70. * Obtains socket object for a specified socket id
  71. *
  72. * @param {object} payload - object containing the payload
  73. * @param {string} payload.socketId - the id of the socket
  74. * @returns {Promise} - returns promise (reject, resolve)
  75. */
  76. async SOCKET_FROM_SOCKET_ID(payload) {
  77. return new Promise((resolve, reject) => {
  78. const { clients } = IOModule._io;
  79. if (clients) {
  80. return clients.forEach(socket => {
  81. if (socket.session.socketId === payload.socketId) resolve(socket);
  82. });
  83. }
  84. return reject();
  85. });
  86. }
  87. /**
  88. * Gets all sockets for a specified session id
  89. *
  90. * @param {object} payload - object containing the payload
  91. * @param {string} payload.sessionId - user session id
  92. * @returns {Promise} - returns promise (reject, resolve)
  93. */
  94. async SOCKETS_FROM_SESSION_ID(payload) {
  95. return new Promise(resolve => {
  96. const { clients } = IOModule._io;
  97. const sockets = [];
  98. if (clients) {
  99. return async.each(
  100. Object.keys(clients),
  101. (id, next) => {
  102. const { session } = clients[id];
  103. if (session.sessionId === payload.sessionId) sockets.push(session.sessionId);
  104. next();
  105. },
  106. () => resolve(sockets)
  107. );
  108. }
  109. return resolve();
  110. });
  111. }
  112. /**
  113. * Returns any sockets for a specific user
  114. *
  115. * @param {object} payload - object that contains the payload
  116. * @param {string} payload.userId - the user id
  117. * @returns {Promise} - returns promise (reject, resolve)
  118. */
  119. async SOCKETS_FROM_USER(payload) {
  120. return new Promise((resolve, reject) => {
  121. const sockets = [];
  122. return async.eachLimit(
  123. IOModule._io.clients,
  124. 1,
  125. (socket, next) => {
  126. const { sessionId } = socket.session;
  127. if (sessionId) {
  128. return CacheModule.runJob("HGET", { table: "sessions", key: sessionId }, this)
  129. .then(session => {
  130. if (session && session.userId === payload.userId) sockets.push(socket);
  131. next();
  132. })
  133. .catch(err => next(err));
  134. }
  135. return next();
  136. },
  137. err => {
  138. if (err) return reject(err);
  139. return resolve(sockets);
  140. }
  141. );
  142. });
  143. }
  144. /**
  145. * Returns any sockets from a specific ip address
  146. *
  147. * @param {object} payload - object that contains the payload
  148. * @param {string} payload.ip - the ip address in question
  149. * @returns {Promise} - returns promise (reject, resolve)
  150. */
  151. async SOCKETS_FROM_IP(payload) {
  152. return new Promise(resolve => {
  153. const { clients } = IOModule._io;
  154. const sockets = [];
  155. return async.each(
  156. Object.keys(clients),
  157. (id, next) => {
  158. const { session } = clients[id];
  159. CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }, this)
  160. .then(session => {
  161. if (session && clients[id].ip === payload.ip) sockets.push(clients[id]);
  162. next();
  163. })
  164. .catch(() => next());
  165. },
  166. () => resolve(sockets)
  167. );
  168. });
  169. }
  170. /**
  171. * Returns any sockets from a specific user without using redis/cache
  172. *
  173. * @param {object} payload - object that contains the payload
  174. * @param {string} payload.userId - the id of the user in question
  175. * @returns {Promise} - returns promise (reject, resolve)
  176. */
  177. async SOCKETS_FROM_USER_WITHOUT_CACHE(payload) {
  178. return new Promise(resolve => {
  179. const { clients } = IOModule._io;
  180. const sockets = [];
  181. if (clients) {
  182. return async.each(
  183. Object.keys(clients),
  184. (id, next) => {
  185. const { session } = clients[id];
  186. if (session.userId === payload.userId) sockets.push(clients[id]);
  187. next();
  188. },
  189. () => resolve(sockets)
  190. );
  191. }
  192. return resolve();
  193. });
  194. }
  195. /**
  196. * Allows a socket to leave any rooms they are connected to
  197. *
  198. * @param {object} payload - object that contains the payload
  199. * @param {string} payload.socketId - the id of the socket which should leave all their rooms
  200. * @returns {Promise} - returns promise (reject, resolve)
  201. */
  202. async SOCKET_LEAVE_ROOMS(payload) {
  203. return new Promise(resolve => {
  204. // filter out rooms that the user is in
  205. Object.keys(IOModule.rooms).forEach(room => {
  206. IOModule.rooms[room] = IOModule.rooms[room].filter(participant => participant !== payload.socketId);
  207. });
  208. return resolve();
  209. });
  210. }
  211. /**
  212. * Allows a socket to join a specified room (this will remove them from any rooms they are currently in)
  213. *
  214. * @param {object} payload - object that contains the payload
  215. * @param {string} payload.socketId - the id of the socket which should join the room
  216. * @param {object} payload.room - the object representing the room the socket should join
  217. * @returns {Promise} - returns promise (reject, resolve)
  218. */
  219. async SOCKET_JOIN_ROOM(payload) {
  220. const { room, socketId } = payload;
  221. console.log("JOIN_ROOM", payload.room);
  222. // leave all other rooms
  223. await IOModule.runJob("SOCKET_LEAVE_ROOMS", { socketId }, this);
  224. return new Promise(resolve => {
  225. // create room if it doesn't exist, and add socketId to array
  226. if (IOModule.rooms[room]) IOModule.rooms[room].push(socketId);
  227. else IOModule.rooms[room] = [socketId];
  228. return resolve();
  229. });
  230. }
  231. /**
  232. * Emits arguments to any sockets that are in a specified a room
  233. *
  234. * @param {object} payload - object that contains the payload
  235. * @param {string} payload.room - the name of the room to emit arguments
  236. * @param {object} payload.args - any arguments to be emitted to the sockets in the specific room
  237. * @returns {Promise} - returns promise (reject, resolve)
  238. */
  239. async EMIT_TO_ROOM(payload) {
  240. return new Promise(resolve => {
  241. if (IOModule.rooms[payload.room])
  242. return IOModule.rooms[payload.room].forEach(async socketId => {
  243. const socket = await IOModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  244. socket.dispatch(...payload.args);
  245. });
  246. return resolve();
  247. });
  248. }
  249. // UNKNOWN
  250. // eslint-disable-next-line require-jsdoc
  251. async SOCKET_JOIN_SONG_ROOM(payload) {
  252. // socketId, room
  253. const socket = await IOModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId: payload.socketId }, this);
  254. return new Promise(resolve => {
  255. const { rooms } = socket;
  256. Object.keys(rooms).forEach(roomKey => {
  257. const room = rooms[roomKey];
  258. if (room.indexOf("song.") !== -1) socket.leave(room);
  259. });
  260. socket.join(payload.room);
  261. return resolve();
  262. });
  263. }
  264. // UNKNOWN
  265. // eslint-disable-next-line require-jsdoc
  266. SOCKETS_JOIN_SONG_ROOM(payload) {
  267. // sockets, room
  268. return new Promise(resolve => {
  269. Object.keys(payload.sockets).forEach(socketKey => {
  270. const socket = payload.sockets[socketKey];
  271. const { rooms } = socket;
  272. Object.keys(rooms).forEach(roomKey => {
  273. const room = rooms[roomKey];
  274. if (room.indexOf("song.") !== -1) socket.leave(room);
  275. });
  276. socket.join(payload.room);
  277. });
  278. return resolve();
  279. });
  280. }
  281. // UNKNOWN
  282. // eslint-disable-next-line require-jsdoc
  283. SOCKETS_LEAVE_SONG_ROOMS(payload) {
  284. // sockets
  285. return new Promise(resolve => {
  286. Object.keys(payload.sockets).forEach(socketKey => {
  287. const socket = payload.sockets[socketKey];
  288. const { rooms } = socket;
  289. Object.keys(rooms).forEach(roomKey => {
  290. const room = rooms[roomKey];
  291. if (room.indexOf("song.") !== -1) socket.leave(room);
  292. });
  293. });
  294. resolve();
  295. });
  296. }
  297. /**
  298. * Gets any sockets connected to a room
  299. *
  300. * @param {object} payload - object that contains the payload
  301. * @param {string} payload.room - the name of the room
  302. * @returns {Promise} - returns promise (reject, resolve)
  303. */
  304. async GET_ROOM_SOCKETS(payload) {
  305. return new Promise(resolve => {
  306. if (IOModule.rooms[payload.room]) return resolve(IOModule.rooms[payload.room]);
  307. return resolve([]);
  308. });
  309. }
  310. /**
  311. * Handles io.use
  312. *
  313. * @param {object} payload - object that contains the payload
  314. * @returns {Promise} - returns promise (reject, resolve)
  315. */
  316. async HANDLE_IO_USE(payload) {
  317. console.log("io use");
  318. return new Promise(resolve => {
  319. const { socket, req } = payload;
  320. let SID;
  321. socket.ip = req.headers["x-forwarded-for"] || "0.0.0.0";
  322. return async.waterfall(
  323. [
  324. next => {
  325. if (!req.headers.cookie) return next("No cookie exists yet.");
  326. return UtilsModule.runJob("PARSE_COOKIES", { cookieString: req.headers.cookie }, this).then(
  327. res => {
  328. SID = res[IOModule.SIDname];
  329. next(null);
  330. }
  331. );
  332. },
  333. next => {
  334. if (!SID) return next("No SID.");
  335. return next();
  336. },
  337. next => {
  338. CacheModule.runJob("HGET", { table: "sessions", key: SID }, this)
  339. .then(session => next(null, session))
  340. .catch(next);
  341. },
  342. (session, next) => {
  343. if (!session) return next("No session found.");
  344. session.refreshDate = Date.now();
  345. socket.session = session;
  346. return CacheModule.runJob(
  347. "HSET",
  348. { table: "sessions", key: SID, value: session },
  349. this
  350. ).then(session => next(null, session));
  351. },
  352. (res, next) => {
  353. // check if a session's user / IP is banned
  354. PunishmentsModule.runJob("GET_PUNISHMENTS", {}, this)
  355. .then(punishments => {
  356. const isLoggedIn = !!(socket.session && socket.session.refreshDate);
  357. const userId = isLoggedIn ? socket.session.userId : null;
  358. const banishment = {
  359. banned: false,
  360. ban: 0
  361. };
  362. punishments.forEach(punishment => {
  363. if (punishment.expiresAt > banishment.ban) banishment.ban = punishment;
  364. if (punishment.type === "banUserId" && isLoggedIn && punishment.value === userId)
  365. banishment.banned = true;
  366. if (punishment.type === "banUserIp" && punishment.value === socket.ip)
  367. banishment.banned = true;
  368. });
  369. socket.banishment = banishment;
  370. next();
  371. })
  372. .catch(() => {
  373. next();
  374. });
  375. }
  376. ],
  377. () => {
  378. if (!socket.session) socket.session = { socketId: req.headers["sec-websocket-key"] };
  379. else socket.session.socketId = req.headers["sec-websocket-key"];
  380. // cb();
  381. resolve(socket);
  382. }
  383. );
  384. });
  385. }
  386. /**
  387. * Handles io.connection
  388. *
  389. * @param {object} payload - object that contains the payload
  390. * @returns {Promise} - returns promise (reject, resolve)
  391. */
  392. async HANDLE_IO_CONNECTION(payload) {
  393. console.log("handle io connection");
  394. return new Promise(resolve => {
  395. const { socket } = payload;
  396. let sessionInfo = "";
  397. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  398. // if session is banned
  399. if (socket.banishment && socket.banishment.banned) {
  400. IOModule.log(
  401. "INFO",
  402. "IO_BANNED_CONNECTION",
  403. `A user tried to connect, but is currently banned. IP: ${socket.ip}.${sessionInfo}`
  404. );
  405. socket.dispatch("keep.event:banned", socket.banishment.ban);
  406. return socket.disconnect(true); // need to fix
  407. }
  408. IOModule.log("INFO", "IO_CONNECTION", `User connected. IP: ${socket.ip}.${sessionInfo}`);
  409. // catch when the socket has been disconnected
  410. socket.on("close", async () => {
  411. if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
  412. IOModule.log("INFO", "IO_DISCONNECTION", `User disconnected. IP: ${socket.ip}.${sessionInfo}`);
  413. await IOModule.runJob("SOCKET_LEAVE_ROOMS", { socketId: socket.session.socketId });
  414. });
  415. // socket.use((data, next) => {
  416. // if (data.length === 0) return next(new Error("Not enough arguments specified."));
  417. // if (typeof data[0] !== "string") return next(new Error("First argument must be a string."));
  418. // const namespaceAction = data[0];
  419. // if (
  420. // !namespaceAction ||
  421. // namespaceAction.indexOf(".") === -1 ||
  422. // namespaceAction.indexOf(".") !== namespaceAction.lastIndexOf(".")
  423. // )
  424. // return next(new Error("Invalid first argument"));
  425. // const namespace = data[0].split(".")[0];
  426. // const action = data[0].split(".")[1];
  427. // if (!namespace) return next(new Error("Invalid namespace."));
  428. // if (!action) return next(new Error("Invalid action."));
  429. // if (!IOModule.actions[namespace]) return next(new Error("Namespace not found."));
  430. // if (!IOModule.actions[namespace][action]) return next(new Error("Action not found."));
  431. // return next();
  432. // });
  433. // catch errors on the socket (internal to socket.io)
  434. socket.onerror = console.error; // need to update
  435. if (socket.session.sessionId) {
  436. CacheModule.runJob("HGET", {
  437. table: "sessions",
  438. key: socket.session.sessionId
  439. })
  440. .then(session => {
  441. if (session && session.userId) {
  442. IOModule.userModel.findOne({ _id: session.userId }, (err, user) => {
  443. if (err || !user) return socket.dispatch("ready", false);
  444. let role = "";
  445. let username = "";
  446. let userId = "";
  447. if (user) {
  448. role = user.role;
  449. username = user.username;
  450. userId = session.userId;
  451. }
  452. return socket.dispatch("ready", true, role, username, userId);
  453. });
  454. } else socket.dispatch("ready", false);
  455. })
  456. .catch(() => {
  457. socket.dispatch("ready", false);
  458. });
  459. } else socket.dispatch("ready", false);
  460. // have the socket listen for each action
  461. Object.keys(IOModule.actions).forEach(namespace => {
  462. Object.keys(IOModule.actions[namespace]).forEach(action => {
  463. // the full name of the action
  464. const name = `${namespace}.${action}`;
  465. socket.onmessage = message => {
  466. const data = JSON.parse(message.data);
  467. if (data[data.length - 1].callbackRef) {
  468. const { callbackRef } = data[data.length - 1];
  469. data.pop();
  470. return socket.actions.emit(data.shift(0), [
  471. ...data,
  472. res => socket.dispatch("callbackRef", callbackRef, res)
  473. ]);
  474. }
  475. return socket.actions.emit(data.shift(0), data);
  476. };
  477. // listen for this action to be called
  478. socket.listen(name, async args =>
  479. IOModule.runJob("RUN_ACTION", { socket, namespace, action, args })
  480. );
  481. });
  482. });
  483. return resolve();
  484. });
  485. }
  486. /**
  487. * Runs an action
  488. *
  489. * @param {object} payload - object that contains the payload
  490. * @returns {Promise} - returns promise (reject, resolve)
  491. */
  492. async RUN_ACTION(payload) {
  493. return new Promise((resolve, reject) => {
  494. const { socket, namespace, action, args } = payload;
  495. // the full name of the action
  496. const name = `${namespace}.${action}`;
  497. let cb = args[args.length - 1];
  498. if (typeof cb !== "function")
  499. cb = () => {
  500. IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
  501. };
  502. else args.pop();
  503. IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
  504. // load the session from the cache
  505. new Promise(resolve => {
  506. if (socket.session.sessionId)
  507. CacheModule.runJob("HGET", {
  508. table: "sessions",
  509. key: socket.session.sessionId
  510. })
  511. .then(session => {
  512. // make sure the sockets sessionId isn't set if there is no session
  513. if (socket.session.sessionId && session === null) delete socket.session.sessionId;
  514. resolve();
  515. })
  516. .catch(() => {
  517. if (typeof cb === "function")
  518. cb({
  519. status: "error",
  520. message: "An error occurred while obtaining your session"
  521. });
  522. reject(new Error("An error occurred while obtaining the session"));
  523. });
  524. else resolve();
  525. })
  526. .then(() => {
  527. // call the job that calls the action, passing it the session, and the arguments socket.io passed us
  528. IOModule.runJob("RUN_ACTION2", { session: socket.session, namespace, action, args }, this)
  529. .then(response => {
  530. cb(response);
  531. resolve();
  532. })
  533. .catch(err => {
  534. if (typeof cb === "function")
  535. cb({
  536. status: "error",
  537. message: "An error occurred while executing the specified action."
  538. });
  539. reject(err);
  540. IOModule.log(
  541. "ERROR",
  542. "IO_ACTION_ERROR",
  543. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  544. );
  545. });
  546. })
  547. .catch(reject);
  548. });
  549. }
  550. /**
  551. * Runs an action
  552. *
  553. * @param {object} payload - object that contains the payload
  554. * @returns {Promise} - returns promise (reject, resolve)
  555. */
  556. async RUN_ACTION2(payload) {
  557. return new Promise((resolve, reject) => {
  558. const { session, namespace, action, args } = payload;
  559. try {
  560. // call the the action, passing it the session, and the arguments socket.io passed us
  561. IOModule.actions[namespace][action].apply(
  562. this,
  563. [session].concat(args).concat([
  564. result => {
  565. IOModule.log(
  566. "INFO",
  567. "RUN_ACTION2",
  568. `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
  569. );
  570. resolve(result);
  571. }
  572. ])
  573. );
  574. } catch (err) {
  575. reject(err);
  576. IOModule.log(
  577. "ERROR",
  578. "IO_ACTION_ERROR",
  579. `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
  580. );
  581. }
  582. });
  583. }
  584. }
  585. export default new _IOModule();