stations.js 54 KB


  1. import async from "async";
  2. import { isLoginRequired, isOwnerRequired } from "./hooks";
  3. import moduleManager from "../../index";
  4. const DBModule = moduleManager.modules.db;
  5. const UtilsModule = moduleManager.modules.utils;
  6. const IOModule = moduleManager.modules.io;
  7. const SongsModule = moduleManager.modules.songs;
  8. const CacheModule = moduleManager.modules.cache;
  9. const NotificationsModule = moduleManager.modules.notifications;
  10. const StationsModule = moduleManager.modules.stations;
  11. const ActivitiesModule = moduleManager.modules.activities;
  12. const YouTubeModule = moduleManager.modules.youtube;
  13. const userList = {};
  14. const usersPerStation = {};
  15. const usersPerStationCount = {};
  16. // Temporarily disabled until the messages in console can be limited
  17. // setInterval(async () => {
  18. // const stationsCountUpdated = [];
  19. // const stationsUpdated = [];
  20. // const oldUsersPerStation = usersPerStation;
  21. // usersPerStation = {};
  22. // const oldUsersPerStationCount = usersPerStationCount;
  23. // usersPerStationCount = {};
  24. // const userModel = await DBModule.runJob("GET_MODEL", {
  25. // modelName: "user"
  26. // });
  27. // async.each(
  28. // Object.keys(userList),
  29. // (socketId, next) => {
  30. // IOModule.runJob("SOCKET_FROM_SESSION", { socketId }, { isQuiet: true }).then(socket => {
  31. // const stationId = userList[socketId];
  32. // if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
  33. // if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  34. // if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  35. // delete userList[socketId];
  36. // return next();
  37. // }
  38. // if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
  39. // usersPerStationCount[stationId] += 1;
  40. // if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
  41. // return async.waterfall(
  42. // [
  43. // next => {
  44. // if (!socket.session || !socket.session.sessionId) return next("No session found.");
  45. // return CacheModule
  46. // .runJob("HGET", {
  47. // table: "sessions",
  48. // key: socket.session.sessionId
  49. // })
  50. // .then(session => {
  51. // next(null, session);
  52. // })
  53. // .catch(next);
  54. // },
  55. // (session, next) => {
  56. // if (!session) return next("Session not found.");
  57. // return userModel.findOne({ _id: session.userId }, next);
  58. // },
  59. // (user, next) => {
  60. // if (!user) return next("User not found.");
  61. // if (usersPerStation[stationId].indexOf(user.username) !== -1)
  62. // return next("User already in the list.");
  63. // return next(null, user.username);
  64. // }
  65. // ],
  66. // (err, username) => {
  67. // if (!err) {
  68. // usersPerStation[stationId].push(username);
  69. // }
  70. // next();
  71. // }
  72. // );
  73. // });
  74. // // TODO Code to show users
  75. // },
  76. // () => {
  77. // Object.keys(usersPerStationCount).forEach(stationId => {
  78. // if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
  79. // if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  80. // }
  81. // })
  82. //
  83. // Object.keys(usersPerStation).forEach(stationId => {
  84. // if (
  85. // _.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 ||
  86. // _.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0
  87. // ) {
  88. // if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  89. // }
  90. // });
  91. //
  92. // stationsCountUpdated.forEach(stationId => {
  93. // console.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  94. // CacheModule.runJob("PUB", {
  95. // table: "station.updateUserCount",
  96. // value: stationId
  97. // });
  98. // });
  99. // stationsUpdated.forEach(stationId => {
  100. // console.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  101. // CacheModule.runJob("PUB", {
  102. // table: "station.updateUsers",
  103. // value: stationId
  104. // });
  105. // });
  106. // // console.log("Userlist", usersPerStation);
  107. // }
  108. // );
  109. // }, 3000);
  110. CacheModule.runJob("SUB", {
  111. channel: "station.updateUsers",
  112. cb: stationId => {
  113. const list = usersPerStation[stationId] || [];
  114. IOModule.runJob("EMIT_TO_ROOM", {
  115. room: `station.${stationId}`,
  116. args: ["event:users.updated", list]
  117. });
  118. }
  119. });
  120. CacheModule.runJob("SUB", {
  121. channel: "station.updateUserCount",
  122. cb: stationId => {
  123. const count = usersPerStationCount[stationId] || 0;
  124. IOModule.runJob("EMIT_TO_ROOM", {
  125. room: `station.${stationId}`,
  126. args: ["event:userCount.updated", count]
  127. });
  128. StationsModule.runJob("GET_STATION", { stationId }).then(async station => {
  129. if (station.privacy === "public")
  130. IOModule.runJob("EMIT_TO_ROOM", {
  131. room: "home",
  132. args: ["event:userCount.updated", stationId, count]
  133. });
  134. else {
  135. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", {
  136. room: "home"
  137. });
  138. Object.keys(sockets).forEach(socketKey => {
  139. const socket = sockets[socketKey];
  140. const { session } = socket;
  141. if (session.sessionId) {
  142. CacheModule.runJob("HGET", {
  143. table: "sessions",
  144. key: session.sessionId
  145. }).then(session => {
  146. if (session)
  147. DBModule.runJob("GET_MODEL", {
  148. modelName: "user"
  149. }).then(userModel =>
  150. userModel.findOne({ _id: session.userId }, (err, user) => {
  151. if (user.role === "admin")
  152. socket.emit("event:userCount.updated", stationId, count);
  153. else if (station.type === "community" && station.owner === session.userId)
  154. socket.emit("event:userCount.updated", stationId, count);
  155. })
  156. );
  157. });
  158. }
  159. });
  160. }
  161. });
  162. }
  163. });
  164. CacheModule.runJob("SUB", {
  165. channel: "station.queueLockToggled",
  166. cb: data => {
  167. IOModule.runJob("EMIT_TO_ROOM", {
  168. room: `station.${data.stationId}`,
  169. args: ["event:queueLockToggled", data.locked]
  170. });
  171. }
  172. });
  173. CacheModule.runJob("SUB", {
  174. channel: "station.updatePartyMode",
  175. cb: data => {
  176. IOModule.runJob("EMIT_TO_ROOM", {
  177. room: `station.${data.stationId}`,
  178. args: ["event:partyMode.updated", data.partyMode]
  179. });
  180. }
  181. });
  182. CacheModule.runJob("SUB", {
  183. channel: "privatePlaylist.selected",
  184. cb: data => {
  185. IOModule.runJob("EMIT_TO_ROOM", {
  186. room: `station.${data.stationId}`,
  187. args: ["event:privatePlaylist.selected", data.playlistId]
  188. });
  189. }
  190. });
  191. CacheModule.runJob("SUB", {
  192. channel: "station.pause",
  193. cb: stationId => {
  194. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  195. IOModule.runJob("EMIT_TO_ROOM", {
  196. room: `station.${stationId}`,
  197. args: ["event:stations.pause", { pausedAt: station.pausedAt }]
  198. });
  199. });
  200. }
  201. });
  202. CacheModule.runJob("SUB", {
  203. channel: "station.resume",
  204. cb: stationId => {
  205. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  206. IOModule.runJob("EMIT_TO_ROOM", {
  207. room: `station.${stationId}`,
  208. args: ["event:stations.resume", { timePaused: station.timePaused }]
  209. });
  210. });
  211. }
  212. });
  213. CacheModule.runJob("SUB", {
  214. channel: "station.queueUpdate",
  215. cb: stationId => {
  216. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  217. IOModule.runJob("EMIT_TO_ROOM", {
  218. room: `station.${stationId}`,
  219. args: ["event:queue.update", station.queue]
  220. });
  221. });
  222. }
  223. });
  224. CacheModule.runJob("SUB", {
  225. channel: "station.voteSkipSong",
  226. cb: stationId => {
  227. IOModule.runJob("EMIT_TO_ROOM", {
  228. room: `station.${stationId}`,
  229. args: ["event:song.voteSkipSong"]
  230. });
  231. }
  232. });
  233. CacheModule.runJob("SUB", {
  234. channel: "station.remove",
  235. cb: stationId => {
  236. IOModule.runJob("EMIT_TO_ROOM", {
  237. room: `station.${stationId}`,
  238. args: ["event:stations.remove"]
  239. });
  240. IOModule.runJob("EMIT_TO_ROOM", {
  241. room: "admin.stations",
  242. args: ["event:admin.station.removed", stationId]
  243. });
  244. }
  245. });
  246. CacheModule.runJob("SUB", {
  247. channel: "station.create",
  248. cb: async stationId => {
  249. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  250. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then(async response => {
  251. const { station } = response;
  252. station.userCount = usersPerStationCount[stationId] || 0;
  253. IOModule.runJob("EMIT_TO_ROOM", {
  254. room: "admin.stations",
  255. args: ["event:admin.station.added", station]
  256. });
  257. // TODO If community, check if on whitelist
  258. if (station.privacy === "public")
  259. IOModule.runJob("EMIT_TO_ROOM", {
  260. room: "home",
  261. args: ["event:stations.created", station]
  262. });
  263. else {
  264. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", {
  265. room: "home"
  266. });
  267. Object.keys(sockets).forEach(socketKey => {
  268. const socket = sockets[socketKey];
  269. const { session } = socket;
  270. if (session.sessionId) {
  271. CacheModule.runJob("HGET", {
  272. table: "sessions",
  273. key: session.sessionId
  274. }).then(session => {
  275. if (session) {
  276. userModel.findOne({ _id: session.userId }, (err, user) => {
  277. if (user.role === "admin") socket.emit("event:stations.created", station);
  278. else if (station.type === "community" && station.owner === session.userId)
  279. socket.emit("event:stations.created", station);
  280. });
  281. }
  282. });
  283. }
  284. });
  285. }
  286. });
  287. }
  288. });
  289. export default {
  290. /**
  291. * Get a list of all the stations
  292. *
  293. * @param {object} session - user session
  294. * @param {Function} cb - callback
  295. */
  296. index: (session, cb) => {
  297. async.waterfall(
  298. [
  299. next => {
  300. CacheModule.runJob("HGETALL", { table: "stations" }).then(stations => {
  301. next(null, stations);
  302. });
  303. },
  304. (items, next) => {
  305. const filteredStations = [];
  306. async.each(
  307. items,
  308. (station, next) => {
  309. async.waterfall(
  310. [
  311. next => {
  312. StationsModule.runJob("CAN_USER_VIEW_STATION", {
  313. station,
  314. userId: session.userId,
  315. hideUnlisted: true
  316. })
  317. .then(exists => {
  318. next(null, exists);
  319. })
  320. .catch(next);
  321. }
  322. ],
  323. (err, exists) => {
  324. if (err) console.log(err);
  325. station.userCount = usersPerStationCount[station._id] || 0;
  326. if (exists) filteredStations.push(station);
  327. next();
  328. }
  329. );
  330. },
  331. () => next(null, filteredStations)
  332. );
  333. }
  334. ],
  335. async (err, stations) => {
  336. if (err) {
  337. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  338. console.log("ERROR", "STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  339. return cb({ status: "failure", message: err });
  340. }
  341. console.log("SUCCESS", "STATIONS_INDEX", `Indexing stations successful.`, false);
  342. return cb({ status: "success", stations });
  343. }
  344. );
  345. },
  346. /**
  347. * Obtains basic metadata of a station in order to format an activity
  348. *
  349. * @param {object} session - user session
  350. * @param {string} stationId - the station id
  351. * @param {Function} cb - callback
  352. */
  353. getStationForActivity: (session, stationId, cb) => {
  354. async.waterfall(
  355. [
  356. next => {
  357. StationsModule.runJob("GET_STATION", { stationId })
  358. .then(station => {
  359. next(null, station);
  360. })
  361. .catch(next);
  362. }
  363. ],
  364. async (err, station) => {
  365. if (err) {
  366. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  367. console.log(
  368. "ERROR",
  369. "STATIONS_GET_STATION_FOR_ACTIVITY",
  370. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  371. );
  372. return cb({ status: "failure", message: err });
  373. }
  374. console.log(
  375. "SUCCESS",
  376. "STATIONS_GET_STATION_FOR_ACTIVITY",
  377. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  378. );
  379. return cb({
  380. status: "success",
  381. data: {
  382. title: station.displayName,
  383. thumbnail: station.currentSong ? station.currentSong.thumbnail : ""
  384. }
  385. });
  386. }
  387. );
  388. },
  389. /**
  390. * Verifies that a station exists
  391. *
  392. * @param {object} session - user session
  393. * @param {string} stationName - the station name
  394. * @param {Function} cb - callback
  395. */
  396. existsByName: (session, stationName, cb) => {
  397. async.waterfall(
  398. [
  399. next => {
  400. StationsModule.runJob("GET_STATION_BY_NAME", { stationName })
  401. .then(station => {
  402. next(null, station);
  403. })
  404. .catch(next);
  405. },
  406. (station, next) => {
  407. if (!station) return next(null, false);
  408. return StationsModule.runJob("CAN_USER_VIEW_STATION", {
  409. station,
  410. userId: session.userId
  411. })
  412. .then(exists => {
  413. next(null, exists);
  414. })
  415. .catch(next);
  416. }
  417. ],
  418. async (err, exists) => {
  419. if (err) {
  420. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  421. console.log(
  422. "ERROR",
  423. "STATION_EXISTS_BY_NAME",
  424. `Checking if station "${stationName}" exists failed. "${err}"`
  425. );
  426. return cb({ status: "failure", message: err });
  427. }
  428. console.log(
  429. "SUCCESS",
  430. "STATION_EXISTS_BY_NAME",
  431. `Station "${stationName}" exists successfully.` /* , false */
  432. );
  433. return cb({ status: "success", exists });
  434. }
  435. );
  436. },
  437. /**
  438. * Gets the official playlist for a station
  439. *
  440. * @param {object} session - user session
  441. * @param {string} stationId - the station id
  442. * @param {Function} cb - callback
  443. */
  444. getPlaylist: (session, stationId, cb) => {
  445. async.waterfall(
  446. [
  447. next => {
  448. StationsModule.runJob("GET_STATION", { stationId })
  449. .then(station => {
  450. next(null, station);
  451. })
  452. .catch(next);
  453. },
  454. (station, next) => {
  455. StationsModule.runJob("CAN_USER_VIEW_STATION", {
  456. station,
  457. userId: session.userId
  458. })
  459. .then(canView => {
  460. if (canView) return next(null, station);
  461. return next("Insufficient permissions.");
  462. })
  463. .catch(err => next(err));
  464. },
  465. (station, next) => {
  466. if (!station) return next("Station not found.");
  467. if (station.type !== "official") return next("This is not an official station.");
  468. return next();
  469. },
  470. next => {
  471. CacheModule.runJob("HGET", {
  472. table: "officialPlaylists",
  473. key: stationId
  474. })
  475. .then(playlist => {
  476. next(null, playlist);
  477. })
  478. .catch(next);
  479. },
  480. (playlist, next) => {
  481. if (!playlist) return next("Playlist not found.");
  482. return next(null, playlist);
  483. }
  484. ],
  485. async (err, playlist) => {
  486. if (err) {
  487. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  488. console.log(
  489. "ERROR",
  490. "STATIONS_GET_PLAYLIST",
  491. `Getting playlist for station "${stationId}" failed. "${err}"`
  492. );
  493. return cb({ status: "failure", message: err });
  494. }
  495. console.log(
  496. "SUCCESS",
  497. "STATIONS_GET_PLAYLIST",
  498. `Got playlist for station "${stationId}" successfully.`,
  499. false
  500. );
  501. return cb({ status: "success", data: playlist.songs });
  502. }
  503. );
  504. },
  505. /**
  506. * Joins the station by its name
  507. *
  508. * @param {object} session - user session
  509. * @param {string} stationName - the station name
  510. * @param {Function} cb - callback
  511. */
  512. join: (session, stationName, cb) => {
  513. async.waterfall(
  514. [
  515. next => {
  516. StationsModule.runJob("GET_STATION_BY_NAME", { stationName })
  517. .then(station => {
  518. next(null, station);
  519. })
  520. .catch(next);
  521. },
  522. (station, next) => {
  523. if (!station) return next("Station not found.");
  524. return StationsModule.runJob("CAN_USER_VIEW_STATION", {
  525. station,
  526. userId: session.userId
  527. })
  528. .then(canView => {
  529. if (!canView) next("Not allowed to join station.");
  530. else next(null, station);
  531. })
  532. .catch(err => next(err));
  533. },
  534. (station, next) => {
  535. IOModule.runJob("SOCKET_JOIN_ROOM", {
  536. socketId: session.socketId,
  537. room: `station.${station._id}`
  538. });
  539. const data = {
  540. _id: station._id,
  541. type: station.type,
  542. currentSong: station.currentSong,
  543. startedAt: station.startedAt,
  544. paused: station.paused,
  545. timePaused: station.timePaused,
  546. pausedAt: station.pausedAt,
  547. description: station.description,
  548. displayName: station.displayName,
  549. privacy: station.privacy,
  550. locked: station.locked,
  551. partyMode: station.partyMode,
  552. owner: station.owner,
  553. privatePlaylist: station.privatePlaylist,
  554. genres: station.genres,
  555. blacklistedGenres: station.blacklistedGenres
  556. };
  557. userList[session.socketId] = station._id;
  558. next(null, data);
  559. },
  560. (data, next) => {
  561. data = JSON.parse(JSON.stringify(data));
  562. data.userCount = usersPerStationCount[data._id] || 0;
  563. data.users = usersPerStation[data._id] || [];
  564. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  565. IOModule.runJob("SOCKET_JOIN_SONG_ROOM", {
  566. socketId: session.socketId,
  567. room: `song.${data.currentSong.songId}`
  568. });
  569. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  570. return SongsModule.runJob("GET_SONG_FROM_ID", {
  571. songId: data.currentSong.songId
  572. })
  573. .then(response => {
  574. const { song } = response;
  575. if (song) {
  576. data.currentSong.likes = song.likes;
  577. data.currentSong.dislikes = song.dislikes;
  578. } else {
  579. data.currentSong.likes = -1;
  580. data.currentSong.dislikes = -1;
  581. }
  582. })
  583. .catch(() => {
  584. data.currentSong.likes = -1;
  585. data.currentSong.dislikes = -1;
  586. })
  587. .finally(() => {
  588. next(null, data);
  589. });
  590. }
  591. ],
  592. async (err, data) => {
  593. if (err) {
  594. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  595. console.log("ERROR", "STATIONS_JOIN", `Joining station "${stationName}" failed. "${err}"`);
  596. return cb({ status: "failure", message: err });
  597. }
  598. console.log("SUCCESS", "STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  599. return cb({ status: "success", data });
  600. }
  601. );
  602. },
  603. /**
  604. * Toggles if a station is locked
  605. *
  606. * @param session
  607. * @param stationId - the station id
  608. * @param cb
  609. */
  610. toggleLock: isOwnerRequired(async (session, stationId, cb) => {
  611. const stationModel = await DBModule.runJob("GET_MODEL", {
  612. modelName: "station"
  613. });
  614. async.waterfall(
  615. [
  616. next => {
  617. StationsModule.runJob("GET_STATION", { stationId })
  618. .then(station => {
  619. next(null, station);
  620. })
  621. .catch(next);
  622. },
  623. (station, next) => {
  624. stationModel.updateOne({ _id: stationId }, { $set: { locked: !station.locked } }, next);
  625. },
  626. (res, next) => {
  627. StationsModule.runJob("UPDATE_STATION", { stationId })
  628. .then(station => {
  629. next(null, station);
  630. })
  631. .catch(next);
  632. }
  633. ],
  634. async (err, station) => {
  635. if (err) {
  636. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  637. console.log(
  638. "ERROR",
  639. "STATIONS_UPDATE_LOCKED_STATUS",
  640. `Toggling the queue lock for station "${stationId}" failed. "${err}"`
  641. );
  642. return cb({ status: "failure", message: err });
  643. }
  644. console.log(
  645. "SUCCESS",
  646. "STATIONS_UPDATE_LOCKED_STATUS",
  647. `Toggled the queue lock for station "${stationId}" successfully to "${station.locked}".`
  648. );
  649. CacheModule.runJob("PUB", {
  650. channel: "station.queueLockToggled",
  651. value: {
  652. stationId,
  653. locked: station.locked
  654. }
  655. });
  656. return cb({ status: "success", data: station.locked });
  657. }
  658. );
  659. }),
  660. /**
  661. * Votes to skip a station
  662. *
  663. * @param session
  664. * @param stationId - the station id
  665. * @param cb
  666. */
  667. voteSkip: isLoginRequired(async (session, stationId, cb) => {
  668. const stationModel = await DBModule.runJob("GET_MODEL", {
  669. modelName: "station"
  670. });
  671. let skipVotes = 0;
  672. let shouldSkip = false;
  673. async.waterfall(
  674. [
  675. next => {
  676. StationsModule.runJob("GET_STATION", { stationId })
  677. .then(station => {
  678. next(null, station);
  679. })
  680. .catch(next);
  681. },
  682. (station, next) => {
  683. if (!station) return next("Station not found.");
  684. return StationsModule.runJob("CAN_USER_VIEW_STATION", {
  685. station,
  686. userId: session.userId
  687. })
  688. .then(canView => {
  689. if (canView) return next(null, station);
  690. return next("Insufficient permissions.");
  691. })
  692. .catch(err => next(err));
  693. },
  694. (station, next) => {
  695. if (!station.currentSong) return next("There is currently no song to skip.");
  696. if (station.currentSong.skipVotes.indexOf(session.userId) !== -1)
  697. return next("You have already voted to skip this song.");
  698. return next(null, station);
  699. },
  700. (station, next) => {
  701. stationModel.updateOne(
  702. { _id: stationId },
  703. { $push: { "currentSong.skipVotes": session.userId } },
  704. next
  705. );
  706. },
  707. (res, next) => {
  708. StationsModule.runJob("UPDATE_STATION", { stationId })
  709. .then(station => {
  710. next(null, station);
  711. })
  712. .catch(next);
  713. },
  714. (station, next) => {
  715. if (!station) return next("Station not found.");
  716. return next(null, station);
  717. },
  718. (station, next) => {
  719. skipVotes = station.currentSong.skipVotes.length;
  720. IOModule.runJob("GET_ROOM_SOCKETS", {
  721. room: `station.${stationId}`
  722. })
  723. .then(sockets => {
  724. next(null, sockets);
  725. })
  726. .catch(next);
  727. },
  728. (sockets, next) => {
  729. if (sockets.length <= skipVotes) shouldSkip = true;
  730. next();
  731. }
  732. ],
  733. async err => {
  734. if (err) {
  735. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  736. console.log("ERROR", "STATIONS_VOTE_SKIP", `Vote skipping station "${stationId}" failed. "${err}"`);
  737. return cb({ status: "failure", message: err });
  738. }
  739. console.log("SUCCESS", "STATIONS_VOTE_SKIP", `Vote skipping "${stationId}" successful.`);
  740. CacheModule.runJob("PUB", {
  741. channel: "station.voteSkipSong",
  742. value: stationId
  743. });
  744. if (shouldSkip) StationsModule.runJob("SKIP_STATION", { stationId });
  745. return cb({
  746. status: "success",
  747. message: "Successfully voted to skip the song."
  748. });
  749. }
  750. );
  751. }),
  752. /**
  753. * Force skips a station
  754. *
  755. * @param session
  756. * @param stationId - the station id
  757. * @param cb
  758. */
  759. forceSkip: isOwnerRequired((session, stationId, cb) => {
  760. async.waterfall(
  761. [
  762. next => {
  763. StationsModule.runJob("GET_STATION", { stationId })
  764. .then(station => {
  765. next(null, station);
  766. })
  767. .catch(next);
  768. },
  769. (station, next) => {
  770. if (!station) return next("Station not found.");
  771. return next();
  772. }
  773. ],
  774. async err => {
  775. if (err) {
  776. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  777. console.log(
  778. "ERROR",
  779. "STATIONS_FORCE_SKIP",
  780. `Force skipping station "${stationId}" failed. "${err}"`
  781. );
  782. return cb({ status: "failure", message: err });
  783. }
  784. NotificationsModule.runJob("UNSCHEDULE", {
  785. name: `stations.nextSong?id=${stationId}`
  786. });
  787. StationsModule.runJob("SKIP_STATION", { stationId });
  788. console.log("SUCCESS", "STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  789. return cb({
  790. status: "success",
  791. message: "Successfully skipped station."
  792. });
  793. }
  794. );
  795. }),
  796. /**
  797. * Leaves the user's current station
  798. *
  799. * @param {object} session - user session
  800. * @param {string} stationId - id of station to leave
  801. * @param {Function} cb - callback
  802. */
  803. leave: (session, stationId, cb) => {
  804. async.waterfall(
  805. [
  806. next => {
  807. StationsModule.runJob("GET_STATION", { stationId })
  808. .then(station => {
  809. next(null, station);
  810. })
  811. .catch(next);
  812. },
  813. (station, next) => {
  814. if (!station) return next("Station not found.");
  815. return next();
  816. }
  817. ],
  818. async (err, userCount) => {
  819. if (err) {
  820. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  821. console.log("ERROR", "STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  822. return cb({ status: "failure", message: err });
  823. }
  824. console.log("SUCCESS", "STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  825. IOModule.runJob("SOCKET_LEAVE_ROOMS", { socketId: session });
  826. delete userList[session.socketId];
  827. return cb({
  828. status: "success",
  829. message: "Successfully left station.",
  830. userCount
  831. });
  832. }
  833. );
  834. },
  835. /**
  836. * Updates a station's name
  837. *
  838. * @param session
  839. * @param stationId - the station id
  840. * @param newName - the new station name
  841. * @param cb
  842. */
  843. updateName: isOwnerRequired(async (session, stationId, newName, cb) => {
  844. const stationModel = await DBModule.runJob("GET_MODEL", {
  845. modelName: "station"
  846. });
  847. async.waterfall(
  848. [
  849. next => {
  850. stationModel.updateOne(
  851. { _id: stationId },
  852. { $set: { name: newName } },
  853. { runValidators: true },
  854. next
  855. );
  856. },
  857. (res, next) => {
  858. StationsModule.runJob("UPDATE_STATION", { stationId })
  859. .then(station => {
  860. next(null, station);
  861. })
  862. .catch(next);
  863. }
  864. ],
  865. async err => {
  866. if (err) {
  867. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  868. console.log(
  869. "ERROR",
  870. "STATIONS_UPDATE_NAME",
  871. `Updating station "${stationId}" name to "${newName}" failed. "${err}"`
  872. );
  873. return cb({ status: "failure", message: err });
  874. }
  875. console.log(
  876. "SUCCESS",
  877. "STATIONS_UPDATE_NAME",
  878. `Updated station "${stationId}" name to "${newName}" successfully.`
  879. );
  880. return cb({
  881. status: "success",
  882. message: "Successfully updated the name."
  883. });
  884. }
  885. );
  886. }),
  887. /**
  888. * Updates a station's display name
  889. *
  890. * @param session
  891. * @param stationId - the station id
  892. * @param newDisplayName - the new station display name
  893. * @param cb
  894. */
  895. updateDisplayName: isOwnerRequired(async (session, stationId, newDisplayName, cb) => {
  896. const stationModel = await DBModule.runJob("GET_MODEL", {
  897. modelName: "station"
  898. });
  899. async.waterfall(
  900. [
  901. next => {
  902. stationModel.updateOne(
  903. { _id: stationId },
  904. { $set: { displayName: newDisplayName } },
  905. { runValidators: true },
  906. next
  907. );
  908. },
  909. (res, next) => {
  910. StationsModule.runJob("UPDATE_STATION", { stationId })
  911. .then(station => {
  912. next(null, station);
  913. })
  914. .catch(next);
  915. }
  916. ],
  917. async err => {
  918. if (err) {
  919. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  920. console.log(
  921. "ERROR",
  922. "STATIONS_UPDATE_DISPLAY_NAME",
  923. `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`
  924. );
  925. return cb({ status: "failure", message: err });
  926. }
  927. console.log(
  928. "SUCCESS",
  929. "STATIONS_UPDATE_DISPLAY_NAME",
  930. `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`
  931. );
  932. return cb({
  933. status: "success",
  934. message: "Successfully updated the display name."
  935. });
  936. }
  937. );
  938. }),
  939. /**
  940. * Updates a station's description
  941. *
  942. * @param session
  943. * @param stationId - the station id
  944. * @param newDescription - the new station description
  945. * @param cb
  946. */
  947. updateDescription: isOwnerRequired(async (session, stationId, newDescription, cb) => {
  948. const stationModel = await DBModule.runJob("GET_MODEL", {
  949. modelName: "station"
  950. });
  951. async.waterfall(
  952. [
  953. next => {
  954. stationModel.updateOne(
  955. { _id: stationId },
  956. { $set: { description: newDescription } },
  957. { runValidators: true },
  958. next
  959. );
  960. },
  961. (res, next) => {
  962. StationsModule.runJob("UPDATE_STATION", { stationId })
  963. .then(station => {
  964. next(null, station);
  965. })
  966. .catch(next);
  967. }
  968. ],
  969. async err => {
  970. if (err) {
  971. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  972. console.log(
  973. "ERROR",
  974. "STATIONS_UPDATE_DESCRIPTION",
  975. `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`
  976. );
  977. return cb({ status: "failure", message: err });
  978. }
  979. console.log(
  980. "SUCCESS",
  981. "STATIONS_UPDATE_DESCRIPTION",
  982. `Updated station "${stationId}" description to "${newDescription}" successfully.`
  983. );
  984. return cb({
  985. status: "success",
  986. message: "Successfully updated the description."
  987. });
  988. }
  989. );
  990. }),
  991. /**
  992. * Updates a station's privacy
  993. *
  994. * @param session
  995. * @param stationId - the station id
  996. * @param newPrivacy - the new station privacy
  997. * @param cb
  998. */
  999. updatePrivacy: isOwnerRequired(async (session, stationId, newPrivacy, cb) => {
  1000. const stationModel = await DBModule.runJob("GET_MODEL", {
  1001. modelName: "station"
  1002. });
  1003. async.waterfall(
  1004. [
  1005. next => {
  1006. stationModel.updateOne(
  1007. { _id: stationId },
  1008. { $set: { privacy: newPrivacy } },
  1009. { runValidators: true },
  1010. next
  1011. );
  1012. },
  1013. (res, next) => {
  1014. StationsModule.runJob("UPDATE_STATION", { stationId })
  1015. .then(station => {
  1016. next(null, station);
  1017. })
  1018. .catch(next);
  1019. }
  1020. ],
  1021. async err => {
  1022. if (err) {
  1023. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1024. console.log(
  1025. "ERROR",
  1026. "STATIONS_UPDATE_PRIVACY",
  1027. `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`
  1028. );
  1029. return cb({ status: "failure", message: err });
  1030. }
  1031. console.log(
  1032. "SUCCESS",
  1033. "STATIONS_UPDATE_PRIVACY",
  1034. `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`
  1035. );
  1036. return cb({
  1037. status: "success",
  1038. message: "Successfully updated the privacy."
  1039. });
  1040. }
  1041. );
  1042. }),
  1043. /**
  1044. * Updates a station's genres
  1045. *
  1046. * @param session
  1047. * @param stationId - the station id
  1048. * @param newGenres - the new station genres
  1049. * @param cb
  1050. */
  1051. updateGenres: isOwnerRequired(async (session, stationId, newGenres, cb) => {
  1052. const stationModel = await DBModule.runJob("GET_MODEL", {
  1053. modelName: "station"
  1054. });
  1055. async.waterfall(
  1056. [
  1057. next => {
  1058. stationModel.updateOne(
  1059. { _id: stationId },
  1060. { $set: { genres: newGenres } },
  1061. { runValidators: true },
  1062. next
  1063. );
  1064. },
  1065. (res, next) => {
  1066. StationsModule.runJob("UPDATE_STATION", { stationId })
  1067. .then(station => {
  1068. next(null, station);
  1069. })
  1070. .catch(next);
  1071. }
  1072. ],
  1073. async err => {
  1074. if (err) {
  1075. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1076. console.log(
  1077. "ERROR",
  1078. "STATIONS_UPDATE_GENRES",
  1079. `Updating station "${stationId}" genres to "${newGenres}" failed. "${err}"`
  1080. );
  1081. return cb({ status: "failure", message: err });
  1082. }
  1083. console.log(
  1084. "SUCCESS",
  1085. "STATIONS_UPDATE_GENRES",
  1086. `Updated station "${stationId}" genres to "${newGenres}" successfully.`
  1087. );
  1088. return cb({
  1089. status: "success",
  1090. message: "Successfully updated the genres."
  1091. });
  1092. }
  1093. );
  1094. }),
  1095. /**
  1096. * Updates a station's blacklisted genres
  1097. *
  1098. * @param session
  1099. * @param stationId - the station id
  1100. * @param newBlacklistedGenres - the new station blacklisted genres
  1101. * @param cb
  1102. */
  1103. updateBlacklistedGenres: isOwnerRequired(async (session, stationId, newBlacklistedGenres, cb) => {
  1104. const stationModel = await DBModule.runJob("GET_MODEL", {
  1105. modelName: "station"
  1106. });
  1107. async.waterfall(
  1108. [
  1109. next => {
  1110. stationModel.updateOne(
  1111. { _id: stationId },
  1112. {
  1113. $set: {
  1114. blacklistedGenres: newBlacklistedGenres
  1115. }
  1116. },
  1117. { runValidators: true },
  1118. next
  1119. );
  1120. },
  1121. (res, next) => {
  1122. StationsModule.runJob("UPDATE_STATION", { stationId })
  1123. .then(station => {
  1124. next(null, station);
  1125. })
  1126. .catch(next);
  1127. }
  1128. ],
  1129. async err => {
  1130. if (err) {
  1131. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1132. console.log(
  1133. "ERROR",
  1134. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1135. `Updating station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" failed. "${err}"`
  1136. );
  1137. return cb({ status: "failure", message: err });
  1138. }
  1139. console.log(
  1140. "SUCCESS",
  1141. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1142. `Updated station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" successfully.`
  1143. );
  1144. return cb({
  1145. status: "success",
  1146. message: "Successfully updated the blacklisted genres."
  1147. });
  1148. }
  1149. );
  1150. }),
  1151. /**
  1152. * Updates a station's party mode
  1153. *
  1154. * @param session
  1155. * @param stationId - the station id
  1156. * @param newPartyMode - the new station party mode
  1157. * @param cb
  1158. */
  1159. updatePartyMode: isOwnerRequired(async (session, stationId, newPartyMode, cb) => {
  1160. const stationModel = await DBModule.runJob("GET_MODEL", {
  1161. modelName: "station"
  1162. });
  1163. async.waterfall(
  1164. [
  1165. next => {
  1166. StationsModule.runJob("GET_STATION", { stationId })
  1167. .then(station => {
  1168. next(null, station);
  1169. })
  1170. .catch(next);
  1171. },
  1172. (station, next) => {
  1173. if (!station) return next("Station not found.");
  1174. if (station.partyMode === newPartyMode)
  1175. return next(`The party mode was already ${newPartyMode ? "enabled." : "disabled."}`);
  1176. return stationModel.updateOne(
  1177. { _id: stationId },
  1178. { $set: { partyMode: newPartyMode } },
  1179. { runValidators: true },
  1180. next
  1181. );
  1182. },
  1183. (res, next) => {
  1184. StationsModule.runJob("UPDATE_STATION", { stationId })
  1185. .then(station => {
  1186. next(null, station);
  1187. })
  1188. .catch(next);
  1189. }
  1190. ],
  1191. async err => {
  1192. if (err) {
  1193. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1194. console.log(
  1195. "ERROR",
  1196. "STATIONS_UPDATE_PARTY_MODE",
  1197. `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`
  1198. );
  1199. return cb({ status: "failure", message: err });
  1200. }
  1201. console.log(
  1202. "SUCCESS",
  1203. "STATIONS_UPDATE_PARTY_MODE",
  1204. `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`
  1205. );
  1206. CacheModule.runJob("PUB", {
  1207. channel: "station.updatePartyMode",
  1208. value: {
  1209. stationId,
  1210. partyMode: newPartyMode
  1211. }
  1212. });
  1213. StationsModule.runJob("SKIP_STATION", { stationId });
  1214. return cb({
  1215. status: "success",
  1216. message: "Successfully updated the party mode."
  1217. });
  1218. }
  1219. );
  1220. }),
  1221. /**
  1222. * Pauses a station
  1223. *
  1224. * @param session
  1225. * @param stationId - the station id
  1226. * @param cb
  1227. */
  1228. pause: isOwnerRequired(async (session, stationId, cb) => {
  1229. const stationModel = await DBModule.runJob("GET_MODEL", {
  1230. modelName: "station"
  1231. });
  1232. async.waterfall(
  1233. [
  1234. next => {
  1235. StationsModule.runJob("GET_STATION", { stationId })
  1236. .then(station => {
  1237. next(null, station);
  1238. })
  1239. .catch(next);
  1240. },
  1241. (station, next) => {
  1242. if (!station) return next("Station not found.");
  1243. if (station.paused) return next("That station was already paused.");
  1244. return stationModel.updateOne(
  1245. { _id: stationId },
  1246. { $set: { paused: true, pausedAt: Date.now() } },
  1247. next
  1248. );
  1249. },
  1250. (res, next) => {
  1251. StationsModule.runJob("UPDATE_STATION", { stationId })
  1252. .then(station => {
  1253. next(null, station);
  1254. })
  1255. .catch(next);
  1256. }
  1257. ],
  1258. async err => {
  1259. if (err) {
  1260. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1261. console.log("ERROR", "STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  1262. return cb({ status: "failure", message: err });
  1263. }
  1264. console.log("SUCCESS", "STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  1265. CacheModule.runJob("PUB", {
  1266. channel: "station.pause",
  1267. value: stationId
  1268. });
  1269. NotificationsModule.runJob("UNSCHEDULE", {
  1270. name: `stations.nextSong?id=${stationId}`
  1271. });
  1272. return cb({
  1273. status: "success",
  1274. message: "Successfully paused."
  1275. });
  1276. }
  1277. );
  1278. }),
  1279. /**
  1280. * Resumes a station
  1281. *
  1282. * @param session
  1283. * @param stationId - the station id
  1284. * @param cb
  1285. */
  1286. resume: isOwnerRequired(async (session, stationId, cb) => {
  1287. const stationModel = await DBModule.runJob("GET_MODEL", {
  1288. modelName: "station"
  1289. });
  1290. async.waterfall(
  1291. [
  1292. next => {
  1293. StationsModule.runJob("GET_STATION", { stationId })
  1294. .then(station => {
  1295. next(null, station);
  1296. })
  1297. .catch(next);
  1298. },
  1299. (station, next) => {
  1300. if (!station) return next("Station not found.");
  1301. if (!station.paused) return next("That station is not paused.");
  1302. station.timePaused += Date.now() - station.pausedAt;
  1303. return stationModel.updateOne(
  1304. { _id: stationId },
  1305. {
  1306. $set: { paused: false },
  1307. $inc: { timePaused: Date.now() - station.pausedAt }
  1308. },
  1309. next
  1310. );
  1311. },
  1312. (res, next) => {
  1313. StationsModule.runJob("UPDATE_STATION", { stationId })
  1314. .then(station => {
  1315. next(null, station);
  1316. })
  1317. .catch(next);
  1318. }
  1319. ],
  1320. async err => {
  1321. if (err) {
  1322. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1323. console.log("ERROR", "STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  1324. return cb({ status: "failure", message: err });
  1325. }
  1326. console.log("SUCCESS", "STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  1327. CacheModule.runJob("PUB", {
  1328. channel: "station.resume",
  1329. value: stationId
  1330. });
  1331. return cb({
  1332. status: "success",
  1333. message: "Successfully resumed."
  1334. });
  1335. }
  1336. );
  1337. }),
  1338. /**
  1339. * Removes a station
  1340. *
  1341. * @param session
  1342. * @param stationId - the station id
  1343. * @param cb
  1344. */
  1345. remove: isOwnerRequired(async (session, stationId, cb) => {
  1346. const stationModel = await DBModule.runJob("GET_MODEL", {
  1347. modelName: "station"
  1348. });
  1349. async.waterfall(
  1350. [
  1351. next => {
  1352. stationModel.deleteOne({ _id: stationId }, err => next(err));
  1353. },
  1354. next => {
  1355. CacheModule.runJob("HDEL", { table: "stations", key: stationId }).then(next).catch(next);
  1356. }
  1357. ],
  1358. async err => {
  1359. if (err) {
  1360. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1361. console.log("ERROR", "STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  1362. return cb({ status: "failure", message: err });
  1363. }
  1364. console.log("SUCCESS", "STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  1365. CacheModule.runJob("PUB", {
  1366. channel: "station.remove",
  1367. value: stationId
  1368. });
  1369. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1370. userId: session.userId,
  1371. activityType: "deleted_station",
  1372. payload: [stationId]
  1373. });
  1374. return cb({
  1375. status: "success",
  1376. message: "Successfully removed."
  1377. });
  1378. }
  1379. );
  1380. }),
  1381. /**
  1382. * Create a station
  1383. *
  1384. * @param session
  1385. * @param data - the station data
  1386. * @param cb
  1387. */
  1388. create: isLoginRequired(async (session, data, cb) => {
  1389. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  1390. const stationModel = await DBModule.runJob("GET_MODEL", {
  1391. modelName: "station"
  1392. });
  1393. data.name = data.name.toLowerCase();
  1394. const blacklist = [
  1395. "country",
  1396. "edm",
  1397. "musare",
  1398. "hip-hop",
  1399. "rap",
  1400. "top-hits",
  1401. "todays-hits",
  1402. "old-school",
  1403. "christmas",
  1404. "about",
  1405. "support",
  1406. "staff",
  1407. "help",
  1408. "news",
  1409. "terms",
  1410. "privacy",
  1411. "profile",
  1412. "c",
  1413. "community",
  1414. "tos",
  1415. "login",
  1416. "register",
  1417. "p",
  1418. "official",
  1419. "o",
  1420. "trap",
  1421. "faq",
  1422. "team",
  1423. "donate",
  1424. "buy",
  1425. "shop",
  1426. "forums",
  1427. "explore",
  1428. "settings",
  1429. "admin",
  1430. "auth",
  1431. "reset_password"
  1432. ];
  1433. async.waterfall(
  1434. [
  1435. next => {
  1436. if (!data) return next("Invalid data.");
  1437. return next();
  1438. },
  1439. next => {
  1440. stationModel.findOne(
  1441. {
  1442. $or: [
  1443. { name: data.name },
  1444. {
  1445. displayName: new RegExp(`^${data.displayName}$`, "i")
  1446. }
  1447. ]
  1448. },
  1449. next
  1450. );
  1451. },
  1452. // eslint-disable-next-line consistent-return
  1453. (station, next) => {
  1454. console.log(station);
  1455. if (station) return next("A station with that name or display name already exists.");
  1456. const { name, displayName, description, genres, playlist, type, blacklistedGenres } = data;
  1457. if (type === "official") {
  1458. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1459. if (err) return next(err);
  1460. if (!user) return next("User not found.");
  1461. if (user.role !== "admin") return next("Admin required.");
  1462. return stationModel.create(
  1463. {
  1464. name,
  1465. displayName,
  1466. description,
  1467. type,
  1468. privacy: "private",
  1469. playlist,
  1470. genres,
  1471. blacklistedGenres,
  1472. currentSong: StationsModule.defaultSong
  1473. },
  1474. next
  1475. );
  1476. });
  1477. }
  1478. if (type === "community") {
  1479. if (blacklist.indexOf(name) !== -1)
  1480. return next("That name is blacklisted. Please use a different name.");
  1481. return stationModel.create(
  1482. {
  1483. name,
  1484. displayName,
  1485. description,
  1486. type,
  1487. privacy: "private",
  1488. owner: session.userId,
  1489. queue: [],
  1490. currentSong: null
  1491. },
  1492. next
  1493. );
  1494. }
  1495. }
  1496. ],
  1497. async (err, station) => {
  1498. if (err) {
  1499. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1500. console.log("ERROR", "STATIONS_CREATE", `Creating station failed. "${err}"`);
  1501. return cb({ status: "failure", message: err });
  1502. }
  1503. console.log("SUCCESS", "STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  1504. CacheModule.runJob("PUB", {
  1505. channel: "station.create",
  1506. value: station._id
  1507. });
  1508. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1509. userId: session.userId,
  1510. activityType: "created_station",
  1511. payload: [station._id]
  1512. });
  1513. return cb({
  1514. status: "success",
  1515. message: "Successfully created station."
  1516. });
  1517. }
  1518. );
  1519. }),
  1520. /**
  1521. * Adds song to station queue
  1522. *
  1523. * @param session
  1524. * @param stationId - the station id
  1525. * @param songId - the song id
  1526. * @param cb
  1527. */
  1528. addToQueue: isLoginRequired(async (session, stationId, songId, cb) => {
  1529. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  1530. const stationModel = await DBModule.runJob("GET_MODEL", {
  1531. modelName: "station"
  1532. });
  1533. async.waterfall(
  1534. [
  1535. next => {
  1536. StationsModule.runJob("GET_STATION", { stationId })
  1537. .then(station => {
  1538. next(null, station);
  1539. })
  1540. .catch(next);
  1541. },
  1542. (station, next) => {
  1543. if (!station) return next("Station not found.");
  1544. if (station.locked) {
  1545. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1546. if (user.role !== "admin" && station.owner !== session.userId)
  1547. return next("Only owners and admins can add songs to a locked queue.");
  1548. return next(null, station);
  1549. });
  1550. }
  1551. return next(null, station);
  1552. },
  1553. (station, next) => {
  1554. if (station.type !== "community") return next("That station is not a community station.");
  1555. return StationsModule.runJob("CAN_USER_VIEW_STATION", {
  1556. station,
  1557. userId: session.userId
  1558. })
  1559. .then(canView => {
  1560. if (canView) return next(null, station);
  1561. return next("Insufficient permissions.");
  1562. })
  1563. .catch(err => next(err));
  1564. },
  1565. (station, next) => {
  1566. if (station.currentSong && station.currentSong.songId === songId)
  1567. return next("That song is currently playing.");
  1568. return async.each(
  1569. station.queue,
  1570. (queueSong, next) => {
  1571. if (queueSong.songId === songId) return next("That song is already in the queue.");
  1572. return next();
  1573. },
  1574. err => next(err, station)
  1575. );
  1576. },
  1577. (station, next) => {
  1578. SongsModule.runJob("GET_SONG_FROM_ID", { songId })
  1579. .then(res => {
  1580. if (res.song) return next(null, res.song, station);
  1581. return YouTubeModule.runJob("GET_SONG", { songId })
  1582. .then(response => {
  1583. const { song } = response;
  1584. song.artists = [];
  1585. song.skipDuration = 0;
  1586. song.likes = -1;
  1587. song.dislikes = -1;
  1588. song.thumbnail = "empty";
  1589. song.explicit = false;
  1590. return next(null, song, station);
  1591. })
  1592. .catch(err => next(err));
  1593. })
  1594. .catch(err => next(err));
  1595. },
  1596. (song, station, next) => {
  1597. const { queue } = station;
  1598. song.requestedBy = session.userId;
  1599. queue.push(song);
  1600. let totalDuration = 0;
  1601. queue.forEach(song => {
  1602. totalDuration += song.duration;
  1603. });
  1604. if (totalDuration >= 3600 * 3) return next("The max length of the queue is 3 hours.");
  1605. return next(null, song, station);
  1606. },
  1607. (song, station, next) => {
  1608. const { queue } = station;
  1609. if (queue.length === 0) return next(null, song, station);
  1610. let totalDuration = 0;
  1611. const userId = queue[queue.length - 1].requestedBy;
  1612. station.queue.forEach(song => {
  1613. if (userId === song.requestedBy) {
  1614. totalDuration += song.duration;
  1615. }
  1616. });
  1617. if (totalDuration >= 900) return next("The max length of songs per user is 15 minutes.");
  1618. return next(null, song, station);
  1619. },
  1620. (song, station, next) => {
  1621. const { queue } = station;
  1622. if (queue.length === 0) return next(null, song);
  1623. let totalSongs = 0;
  1624. const userId = queue[queue.length - 1].requestedBy;
  1625. queue.forEach(song => {
  1626. if (userId === song.requestedBy) {
  1627. totalSongs += 1;
  1628. }
  1629. });
  1630. if (totalSongs <= 2) return next(null, song);
  1631. if (totalSongs > 3)
  1632. return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1633. if (queue[queue.length - 2].requestedBy !== userId || queue[queue.length - 3] !== userId)
  1634. return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1635. return next(null, song);
  1636. },
  1637. (song, next) => {
  1638. stationModel.updateOne(
  1639. { _id: stationId },
  1640. { $push: { queue: song } },
  1641. { runValidators: true },
  1642. next
  1643. );
  1644. },
  1645. (res, next) => {
  1646. StationsModule.runJob("UPDATE_STATION", { stationId })
  1647. .then(station => {
  1648. next(null, station);
  1649. })
  1650. .catch(next);
  1651. }
  1652. ],
  1653. async err => {
  1654. if (err) {
  1655. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1656. console.log(
  1657. "ERROR",
  1658. "STATIONS_ADD_SONG_TO_QUEUE",
  1659. `Adding song "${songId}" to station "${stationId}" queue failed. "${err}"`
  1660. );
  1661. return cb({ status: "failure", message: err });
  1662. }
  1663. console.log(
  1664. "SUCCESS",
  1665. "STATIONS_ADD_SONG_TO_QUEUE",
  1666. `Added song "${songId}" to station "${stationId}" successfully.`
  1667. );
  1668. CacheModule.runJob("PUB", {
  1669. channel: "station.queueUpdate",
  1670. value: stationId
  1671. });
  1672. return cb({
  1673. status: "success",
  1674. message: "Successfully added song to queue."
  1675. });
  1676. }
  1677. );
  1678. }),
  1679. /**
  1680. * Removes song from station queue
  1681. *
  1682. * @param session
  1683. * @param stationId - the station id
  1684. * @param songId - the song id
  1685. * @param cb
  1686. */
  1687. removeFromQueue: isOwnerRequired(async (session, stationId, songId, cb) => {
  1688. const stationModel = await DBModule.runJob("GET_MODEL", {
  1689. modelName: "station"
  1690. });
  1691. async.waterfall(
  1692. [
  1693. next => {
  1694. if (!songId) return next("Invalid song id.");
  1695. return StationsModule.runJob("GET_STATION", { stationId })
  1696. .then(station => {
  1697. next(null, station);
  1698. })
  1699. .catch(next);
  1700. },
  1701. (station, next) => {
  1702. if (!station) return next("Station not found.");
  1703. if (station.type !== "community") return next("Station is not a community station.");
  1704. return async.each(
  1705. station.queue,
  1706. (queueSong, next) => {
  1707. if (queueSong.songId === songId) return next(true);
  1708. return next();
  1709. },
  1710. err => {
  1711. if (err === true) return next();
  1712. return next("Song is not currently in the queue.");
  1713. }
  1714. );
  1715. },
  1716. next => {
  1717. stationModel.updateOne({ _id: stationId }, { $pull: { queue: { songId } } }, next);
  1718. },
  1719. (res, next) => {
  1720. StationsModule.runJob("UPDATE_STATION", { stationId })
  1721. .then(station => {
  1722. next(null, station);
  1723. })
  1724. .catch(next);
  1725. }
  1726. ],
  1727. async err => {
  1728. if (err) {
  1729. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1730. console.log(
  1731. "ERROR",
  1732. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1733. `Removing song "${songId}" from station "${stationId}" queue failed. "${err}"`
  1734. );
  1735. return cb({ status: "failure", message: err });
  1736. }
  1737. console.log(
  1738. "SUCCESS",
  1739. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1740. `Removed song "${songId}" from station "${stationId}" successfully.`
  1741. );
  1742. CacheModule.runJob("PUB", {
  1743. channel: "station.queueUpdate",
  1744. value: stationId
  1745. });
  1746. return cb({
  1747. status: "success",
  1748. message: "Successfully removed song from queue."
  1749. });
  1750. }
  1751. );
  1752. }),
  1753. /**
  1754. * Gets the queue from a station
  1755. *
  1756. * @param {object} session - user session
  1757. * @param {string} stationId - the station id
  1758. * @param {Function} cb - callback
  1759. */
  1760. getQueue: (session, stationId, cb) => {
  1761. async.waterfall(
  1762. [
  1763. next => {
  1764. StationsModule.runJob("GET_STATION", { stationId })
  1765. .then(station => {
  1766. next(null, station);
  1767. })
  1768. .catch(next);
  1769. },
  1770. (station, next) => {
  1771. if (!station) return next("Station not found.");
  1772. if (station.type !== "community") return next("Station is not a community station.");
  1773. return next(null, station);
  1774. },
  1775. (station, next) => {
  1776. StationsModule.runJob("CAN_USER_VIEW_STATION", {
  1777. station,
  1778. userId: session.userId
  1779. })
  1780. .then(canView => {
  1781. if (canView) return next(null, station);
  1782. return next("Insufficient permissions.");
  1783. })
  1784. .catch(err => next(err));
  1785. }
  1786. ],
  1787. async (err, station) => {
  1788. if (err) {
  1789. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1790. console.log(
  1791. "ERROR",
  1792. "STATIONS_GET_QUEUE",
  1793. `Getting queue for station "${stationId}" failed. "${err}"`
  1794. );
  1795. return cb({ status: "failure", message: err });
  1796. }
  1797. console.log("SUCCESS", "STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  1798. return cb({
  1799. status: "success",
  1800. message: "Successfully got queue.",
  1801. queue: station.queue
  1802. });
  1803. }
  1804. );
  1805. },
  1806. /**
  1807. * Selects a private playlist for a station
  1808. *
  1809. * @param session
  1810. * @param stationId - the station id
  1811. * @param playlistId - the private playlist id
  1812. * @param cb
  1813. */
  1814. selectPrivatePlaylist: isOwnerRequired(async (session, stationId, playlistId, cb) => {
  1815. const stationModel = await DBModule.runJob("GET_MODEL", {
  1816. modelName: "station"
  1817. });
  1818. const playlistModel = await DBModule.runJob("GET_MODEL", {
  1819. modelName: "playlist"
  1820. });
  1821. async.waterfall(
  1822. [
  1823. next => {
  1824. StationsModule.runJob("GET_STATION", { stationId })
  1825. .then(station => {
  1826. next(null, station);
  1827. })
  1828. .catch(next);
  1829. },
  1830. (station, next) => {
  1831. if (!station) return next("Station not found.");
  1832. if (station.type !== "community") return next("Station is not a community station.");
  1833. if (station.privatePlaylist === playlistId)
  1834. return next("That private playlist is already selected.");
  1835. return playlistModel.findOne({ _id: playlistId }, next);
  1836. },
  1837. (playlist, next) => {
  1838. if (!playlist) return next("Playlist not found.");
  1839. const currentSongIndex = playlist.songs.length > 0 ? playlist.songs.length - 1 : 0;
  1840. return stationModel.updateOne(
  1841. { _id: stationId },
  1842. {
  1843. $set: {
  1844. privatePlaylist: playlistId,
  1845. currentSongIndex
  1846. }
  1847. },
  1848. { runValidators: true },
  1849. next
  1850. );
  1851. },
  1852. (res, next) => {
  1853. StationsModule.runJob("UPDATE_STATION", { stationId })
  1854. .then(station => {
  1855. next(null, station);
  1856. })
  1857. .catch(next);
  1858. }
  1859. ],
  1860. async (err, station) => {
  1861. if (err) {
  1862. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1863. console.log(
  1864. "ERROR",
  1865. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  1866. `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  1867. );
  1868. return cb({ status: "failure", message: err });
  1869. }
  1870. console.log(
  1871. "SUCCESS",
  1872. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  1873. `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`
  1874. );
  1875. NotificationsModule.runJob("UNSCHEDULE", {
  1876. name: `stations.nextSong?id${stationId}`
  1877. });
  1878. if (!station.partyMode) StationsModule.runJob("SKIP_STATION", { stationId });
  1879. CacheModule.runJob("PUB", {
  1880. channel: "privatePlaylist.selected",
  1881. value: {
  1882. playlistId,
  1883. stationId
  1884. }
  1885. });
  1886. return cb({
  1887. status: "success",
  1888. message: "Successfully selected playlist."
  1889. });
  1890. }
  1891. );
  1892. }),
  1893. favoriteStation: isLoginRequired(async (session, stationId, cb) => {
  1894. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  1895. async.waterfall(
  1896. [
  1897. next => {
  1898. StationsModule.runJob("GET_STATION", { stationId })
  1899. .then(station => {
  1900. next(null, station);
  1901. })
  1902. .catch(next);
  1903. },
  1904. (station, next) => {
  1905. if (!station) return next("Station not found.");
  1906. return StationsModule.runJob("CAN_USER_VIEW_STATION", {
  1907. station,
  1908. userId: session.userId
  1909. })
  1910. .then(canView => {
  1911. if (canView) return next();
  1912. return next("Insufficient permissions.");
  1913. })
  1914. .catch(err => next(err));
  1915. },
  1916. next => {
  1917. userModel.updateOne({ _id: session.userId }, { $addToSet: { favoriteStations: stationId } }, next);
  1918. },
  1919. (res, next) => {
  1920. if (res.nModified === 0) return next("The station was already favorited.");
  1921. return next();
  1922. }
  1923. ],
  1924. async err => {
  1925. if (err) {
  1926. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1927. console.log("ERROR", "FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  1928. return cb({ status: "failure", message: err });
  1929. }
  1930. console.log("SUCCESS", "FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  1931. CacheModule.runJob("PUB", {
  1932. channel: "user.favoritedStation",
  1933. value: {
  1934. userId: session.userId,
  1935. stationId
  1936. }
  1937. });
  1938. return cb({
  1939. status: "success",
  1940. message: "Succesfully favorited station."
  1941. });
  1942. }
  1943. );
  1944. }),
  1945. unfavoriteStation: isLoginRequired(async (session, stationId, cb) => {
  1946. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  1947. async.waterfall(
  1948. [
  1949. next => {
  1950. userModel.updateOne({ _id: session.userId }, { $pull: { favoriteStations: stationId } }, next);
  1951. },
  1952. (res, next) => {
  1953. if (res.nModified === 0) return next("The station wasn't favorited.");
  1954. return next();
  1955. }
  1956. ],
  1957. async err => {
  1958. if (err) {
  1959. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1960. console.log("ERROR", "UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  1961. return cb({ status: "failure", message: err });
  1962. }
  1963. console.log("SUCCESS", "UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  1964. CacheModule.runJob("PUB", {
  1965. channel: "user.unfavoritedStation",
  1966. value: {
  1967. userId: session.userId,
  1968. stationId
  1969. }
  1970. });
  1971. return cb({
  1972. status: "success",
  1973. message: "Succesfully unfavorited station."
  1974. });
  1975. }
  1976. );
  1977. })
  1978. };