stations.js 59 KB


  1. import async from "async";
  2. import { isLoginRequired, isOwnerRequired } from "./hooks";
  3. import moduleManager from "../../index";
  4. const DBModule = moduleManager.modules.db;
  5. const UtilsModule = moduleManager.modules.utils;
  6. const IOModule = moduleManager.modules.io;
  7. const SongsModule = moduleManager.modules.songs;
  8. const CacheModule = moduleManager.modules.cache;
  9. const NotificationsModule = moduleManager.modules.notifications;
  10. const StationsModule = moduleManager.modules.stations;
  11. const ActivitiesModule = moduleManager.modules.activities;
  12. const YouTubeModule = moduleManager.modules.youtube;
  13. CacheModule.runJob("SUB", {
  14. channel: "station.updateUsers",
  15. cb: stationId => {
  16. const list = StationsModule.usersPerStation[stationId] || [];
  17. IOModule.runJob("EMIT_TO_ROOM", {
  18. room: `station.${stationId}`,
  19. args: ["event:users.updated", list]
  20. });
  21. }
  22. });
  23. CacheModule.runJob("SUB", {
  24. channel: "station.updateUserCount",
  25. cb: stationId => {
  26. const count = StationsModule.usersPerStationCount[stationId] || 0;
  27. IOModule.runJob("EMIT_TO_ROOM", {
  28. room: `station.${stationId}`,
  29. args: ["event:userCount.updated", count]
  30. });
  31. StationsModule.runJob("GET_STATION", { stationId }).then(async station => {
  32. if (station.privacy === "public")
  33. IOModule.runJob("EMIT_TO_ROOM", {
  34. room: "home",
  35. args: ["event:userCount.updated", stationId, count]
  36. });
  37. else {
  38. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", {
  39. room: "home"
  40. });
  41. Object.keys(sockets).forEach(socketKey => {
  42. const socket = sockets[socketKey];
  43. const { session } = socket;
  44. if (session.sessionId) {
  45. CacheModule.runJob("HGET", {
  46. table: "sessions",
  47. key: session.sessionId
  48. }).then(session => {
  49. if (session)
  50. DBModule.runJob("GET_MODEL", {
  51. modelName: "user"
  52. }).then(userModel =>
  53. userModel.findOne({ _id: session.userId }, (err, user) => {
  54. if (user.role === "admin")
  55. socket.emit("event:userCount.updated", stationId, count);
  56. else if (station.type === "community" && station.owner === session.userId)
  57. socket.emit("event:userCount.updated", stationId, count);
  58. })
  59. );
  60. });
  61. }
  62. });
  63. }
  64. });
  65. }
  66. });
  67. CacheModule.runJob("SUB", {
  68. channel: "station.queueLockToggled",
  69. cb: data => {
  70. IOModule.runJob("EMIT_TO_ROOM", {
  71. room: `station.${data.stationId}`,
  72. args: ["event:queueLockToggled", data.locked]
  73. });
  74. }
  75. });
  76. CacheModule.runJob("SUB", {
  77. channel: "station.updatePartyMode",
  78. cb: data => {
  79. IOModule.runJob("EMIT_TO_ROOM", {
  80. room: `station.${data.stationId}`,
  81. args: ["event:partyMode.updated", data.partyMode]
  82. });
  83. }
  84. });
  85. CacheModule.runJob("SUB", {
  86. channel: "privatePlaylist.selected",
  87. cb: data => {
  88. IOModule.runJob("EMIT_TO_ROOM", {
  89. room: `station.${data.stationId}`,
  90. args: ["event:privatePlaylist.selected", data.playlistId]
  91. });
  92. }
  93. });
  94. CacheModule.runJob("SUB", {
  95. channel: "privatePlaylist.deselected",
  96. cb: data => {
  97. IOModule.runJob("EMIT_TO_ROOM", {
  98. room: `station.${data.stationId}`,
  99. args: ["event:privatePlaylist.deselected"]
  100. });
  101. }
  102. });
  103. CacheModule.runJob("SUB", {
  104. channel: "station.pause",
  105. cb: stationId => {
  106. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  107. IOModule.runJob("EMIT_TO_ROOM", {
  108. room: `station.${stationId}`,
  109. args: ["event:stations.pause", { pausedAt: station.pausedAt }]
  110. });
  111. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  112. room: `home`,
  113. station
  114. }).then(response => {
  115. const { socketsThatCan } = response;
  116. socketsThatCan.forEach(socket => {
  117. socket.emit("event:station.pause", { stationId });
  118. });
  119. });
  120. });
  121. }
  122. });
  123. CacheModule.runJob("SUB", {
  124. channel: "station.resume",
  125. cb: stationId => {
  126. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  127. IOModule.runJob("EMIT_TO_ROOM", {
  128. room: `station.${stationId}`,
  129. args: ["event:stations.resume", { timePaused: station.timePaused }]
  130. });
  131. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  132. room: `home`,
  133. station
  134. })
  135. .then(response => {
  136. const { socketsThatCan } = response;
  137. socketsThatCan.forEach(socket => {
  138. socket.emit("event:station.resume", { stationId });
  139. });
  140. })
  141. .catch(console.log);
  142. });
  143. }
  144. });
  145. CacheModule.runJob("SUB", {
  146. channel: "station.privacyUpdate",
  147. cb: response => {
  148. const { stationId, previousPrivacy } = response;
  149. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  150. if (previousPrivacy !== station.privacy) {
  151. if (station.privacy === "public") {
  152. // Station became public
  153. IOModule.runJob("EMIT_TO_ROOM", {
  154. room: "home",
  155. args: ["event:stations.created", station]
  156. });
  157. } else if (previousPrivacy === "public") {
  158. // Station became hidden
  159. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  160. room: `home`,
  161. station
  162. }).then(response => {
  163. const { socketsThatCan, socketsThatCannot } = response;
  164. socketsThatCan.forEach(socket => {
  165. socket.emit("event:station.updatePrivacy", { stationId, privacy: station.privacy });
  166. });
  167. socketsThatCannot.forEach(socket => {
  168. socket.emit("event:station.removed", { stationId });
  169. });
  170. });
  171. } else {
  172. // Station was hidden and is still hidden
  173. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  174. room: `home`,
  175. station
  176. }).then(response => {
  177. const { socketsThatCan } = response;
  178. socketsThatCan.forEach(socket => {
  179. socket.emit("event:station.updatePrivacy", { stationId, privacy: station.privacy });
  180. });
  181. });
  182. }
  183. }
  184. });
  185. }
  186. });
  187. CacheModule.runJob("SUB", {
  188. channel: "station.nameUpdate",
  189. cb: response => {
  190. const { stationId } = response;
  191. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  192. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  193. room: `home`,
  194. station
  195. }).then(response => {
  196. const { socketsThatCan } = response;
  197. socketsThatCan.forEach(socket => {
  198. socket.emit("event:station.updateName", { stationId, name: station.name });
  199. });
  200. });
  201. });
  202. }
  203. });
  204. CacheModule.runJob("SUB", {
  205. channel: "station.displayNameUpdate",
  206. cb: response => {
  207. const { stationId } = response;
  208. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  209. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  210. room: `home`,
  211. station
  212. }).then(response => {
  213. const { socketsThatCan } = response;
  214. socketsThatCan.forEach(socket => {
  215. socket.emit("event:station.updateDisplayName", { stationId, displayName: station.displayName });
  216. });
  217. });
  218. });
  219. }
  220. });
  221. CacheModule.runJob("SUB", {
  222. channel: "station.descriptionUpdate",
  223. cb: response => {
  224. const { stationId } = response;
  225. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  226. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  227. room: `home`,
  228. station
  229. }).then(response => {
  230. const { socketsThatCan } = response;
  231. socketsThatCan.forEach(socket => {
  232. socket.emit("event:station.updateDescription", { stationId, description: station.description });
  233. });
  234. });
  235. });
  236. }
  237. });
  238. CacheModule.runJob("SUB", {
  239. channel: "station.queueUpdate",
  240. cb: stationId => {
  241. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  242. IOModule.runJob("EMIT_TO_ROOM", {
  243. room: `station.${stationId}`,
  244. args: ["event:queue.update", station.queue]
  245. });
  246. });
  247. }
  248. });
  249. CacheModule.runJob("SUB", {
  250. channel: "station.voteSkipSong",
  251. cb: stationId => {
  252. IOModule.runJob("EMIT_TO_ROOM", {
  253. room: `station.${stationId}`,
  254. args: ["event:song.voteSkipSong"]
  255. });
  256. }
  257. });
  258. CacheModule.runJob("SUB", {
  259. channel: "station.remove",
  260. cb: stationId => {
  261. IOModule.runJob("EMIT_TO_ROOM", {
  262. room: `station.${stationId}`,
  263. args: ["event:stations.remove"]
  264. });
  265. console.log(111, "REMOVED");
  266. IOModule.runJob("EMIT_TO_ROOM", {
  267. room: `home`,
  268. args: ["event:station.removed", { stationId }]
  269. });
  270. IOModule.runJob("EMIT_TO_ROOM", {
  271. room: "admin.stations",
  272. args: ["event:admin.station.removed", stationId]
  273. });
  274. }
  275. });
  276. CacheModule.runJob("SUB", {
  277. channel: "station.create",
  278. cb: async stationId => {
  279. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  280. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then(async response => {
  281. const { station } = response;
  282. station.userCount = StationsModule.usersPerStationCount[stationId] || 0;
  283. IOModule.runJob("EMIT_TO_ROOM", {
  284. room: "admin.stations",
  285. args: ["event:admin.station.added", station]
  286. });
  287. // TODO If community, check if on whitelist
  288. if (station.privacy === "public")
  289. IOModule.runJob("EMIT_TO_ROOM", {
  290. room: "home",
  291. args: ["event:stations.created", station]
  292. });
  293. else {
  294. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", {
  295. room: "home"
  296. });
  297. Object.keys(sockets).forEach(socketKey => {
  298. const socket = sockets[socketKey];
  299. const { session } = socket;
  300. if (session.sessionId) {
  301. CacheModule.runJob("HGET", {
  302. table: "sessions",
  303. key: session.sessionId
  304. }).then(session => {
  305. if (session) {
  306. userModel.findOne({ _id: session.userId }, (err, user) => {
  307. if (user.role === "admin") socket.emit("event:stations.created", station);
  308. else if (station.type === "community" && station.owner === session.userId)
  309. socket.emit("event:stations.created", station);
  310. });
  311. }
  312. });
  313. }
  314. });
  315. }
  316. });
  317. }
  318. });
  319. export default {
  320. /**
  321. * Get a list of all the stations
  322. *
  323. * @param {object} session - user session
  324. * @param {Function} cb - callback
  325. */
  326. index(session, cb) {
  327. async.waterfall(
  328. [
  329. next => {
  330. CacheModule.runJob("HGETALL", { table: "stations" }, this).then(stations => {
  331. next(null, stations);
  332. });
  333. },
  334. (items, next) => {
  335. const filteredStations = [];
  336. async.each(
  337. items,
  338. (station, next) => {
  339. async.waterfall(
  340. [
  341. next => {
  342. StationsModule.runJob(
  343. "HAS_USER_FAVORITED_STATION",
  344. {
  345. userId: session.userId,
  346. stationId: station._id
  347. },
  348. this
  349. )
  350. .then(isStationFavorited => {
  351. station.isFavorited = isStationFavorited;
  352. return next();
  353. })
  354. .catch(err => next(err));
  355. },
  356. next => {
  357. StationsModule.runJob(
  358. "CAN_USER_VIEW_STATION",
  359. {
  360. station,
  361. userId: session.userId,
  362. hideUnlisted: true
  363. },
  364. this
  365. )
  366. .then(exists => next(null, exists))
  367. .catch(next);
  368. }
  369. ],
  370. (err, exists) => {
  371. if (err) this.log(err);
  372. station.userCount = StationsModule.usersPerStationCount[station._id] || 0;
  373. if (exists) filteredStations.push(station);
  374. next();
  375. }
  376. );
  377. },
  378. () => next(null, filteredStations)
  379. );
  380. }
  381. ],
  382. async (err, stations) => {
  383. if (err) {
  384. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  385. this.log("ERROR", "STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  386. return cb({ status: "failure", message: err });
  387. }
  388. this.log("SUCCESS", "STATIONS_INDEX", `Indexing stations successful.`, false);
  389. return cb({ status: "success", stations });
  390. }
  391. );
  392. },
  393. /**
  394. * Obtains basic metadata of a station in order to format an activity
  395. *
  396. * @param {object} session - user session
  397. * @param {string} stationId - the station id
  398. * @param {Function} cb - callback
  399. */
  400. getStationForActivity(session, stationId, cb) {
  401. async.waterfall(
  402. [
  403. next => {
  404. StationsModule.runJob("GET_STATION", { stationId }, this)
  405. .then(station => {
  406. next(null, station);
  407. })
  408. .catch(next);
  409. }
  410. ],
  411. async (err, station) => {
  412. if (err) {
  413. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  414. this.log(
  415. "ERROR",
  416. "STATIONS_GET_STATION_FOR_ACTIVITY",
  417. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  418. );
  419. return cb({ status: "failure", message: err });
  420. }
  421. this.log(
  422. "SUCCESS",
  423. "STATIONS_GET_STATION_FOR_ACTIVITY",
  424. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  425. );
  426. return cb({
  427. status: "success",
  428. data: {
  429. title: station.displayName,
  430. thumbnail: station.currentSong ? station.currentSong.thumbnail : ""
  431. }
  432. });
  433. }
  434. );
  435. },
  436. /**
  437. * Verifies that a station exists
  438. *
  439. * @param {object} session - user session
  440. * @param {string} stationName - the station name
  441. * @param {Function} cb - callback
  442. */
  443. existsByName(session, stationName, cb) {
  444. async.waterfall(
  445. [
  446. next => {
  447. StationsModule.runJob("GET_STATION_BY_NAME", { stationName }, this)
  448. .then(station => {
  449. next(null, station);
  450. })
  451. .catch(next);
  452. },
  453. (station, next) => {
  454. if (!station) return next(null, false);
  455. return StationsModule.runJob(
  456. "CAN_USER_VIEW_STATION",
  457. {
  458. station,
  459. userId: session.userId
  460. },
  461. this
  462. )
  463. .then(exists => {
  464. next(null, exists);
  465. })
  466. .catch(next);
  467. }
  468. ],
  469. async (err, exists) => {
  470. if (err) {
  471. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  472. this.log(
  473. "ERROR",
  474. "STATION_EXISTS_BY_NAME",
  475. `Checking if station "${stationName}" exists failed. "${err}"`
  476. );
  477. return cb({ status: "failure", message: err });
  478. }
  479. this.log(
  480. "SUCCESS",
  481. "STATION_EXISTS_BY_NAME",
  482. `Station "${stationName}" exists successfully.` /* , false */
  483. );
  484. return cb({ status: "success", exists });
  485. }
  486. );
  487. },
  488. /**
  489. * Gets the official playlist for a station
  490. *
  491. * @param {object} session - user session
  492. * @param {string} stationId - the station id
  493. * @param {Function} cb - callback
  494. */
  495. getPlaylist(session, stationId, cb) {
  496. async.waterfall(
  497. [
  498. next => {
  499. StationsModule.runJob("GET_STATION", { stationId }, this)
  500. .then(station => {
  501. next(null, station);
  502. })
  503. .catch(next);
  504. },
  505. (station, next) => {
  506. StationsModule.runJob(
  507. "CAN_USER_VIEW_STATION",
  508. {
  509. station,
  510. userId: session.userId
  511. },
  512. this
  513. )
  514. .then(canView => {
  515. if (canView) return next(null, station);
  516. return next("Insufficient permissions.");
  517. })
  518. .catch(err => next(err));
  519. },
  520. (station, next) => {
  521. if (!station) return next("Station not found.");
  522. if (station.type !== "official") return next("This is not an official station.");
  523. return next();
  524. },
  525. next => {
  526. CacheModule.runJob(
  527. "HGET",
  528. {
  529. table: "officialPlaylists",
  530. key: stationId
  531. },
  532. this
  533. )
  534. .then(playlist => {
  535. next(null, playlist);
  536. })
  537. .catch(next);
  538. },
  539. (playlist, next) => {
  540. if (!playlist) return next("Playlist not found.");
  541. return next(null, playlist);
  542. }
  543. ],
  544. async (err, playlist) => {
  545. if (err) {
  546. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  547. this.log(
  548. "ERROR",
  549. "STATIONS_GET_PLAYLIST",
  550. `Getting playlist for station "${stationId}" failed. "${err}"`
  551. );
  552. return cb({ status: "failure", message: err });
  553. }
  554. this.log(
  555. "SUCCESS",
  556. "STATIONS_GET_PLAYLIST",
  557. `Got playlist for station "${stationId}" successfully.`,
  558. false
  559. );
  560. return cb({ status: "success", data: playlist.songs });
  561. }
  562. );
  563. },
  564. /**
  565. * Joins the station by its name
  566. *
  567. * @param {object} session - user session
  568. * @param {string} stationName - the station name
  569. * @param {Function} cb - callback
  570. */
  571. join(session, stationName, cb) {
  572. async.waterfall(
  573. [
  574. next => {
  575. StationsModule.runJob("GET_STATION_BY_NAME", { stationName }, this)
  576. .then(station => {
  577. next(null, station);
  578. })
  579. .catch(next);
  580. },
  581. (station, next) => {
  582. if (!station) return next("Station not found.");
  583. return StationsModule.runJob(
  584. "CAN_USER_VIEW_STATION",
  585. {
  586. station,
  587. userId: session.userId
  588. },
  589. this
  590. )
  591. .then(canView => {
  592. if (!canView) next("Not allowed to join station.");
  593. else next(null, station);
  594. })
  595. .catch(err => next(err));
  596. },
  597. (station, next) => {
  598. IOModule.runJob("SOCKET_JOIN_ROOM", {
  599. socketId: session.socketId,
  600. room: `station.${station._id}`
  601. });
  602. const data = {
  603. _id: station._id,
  604. type: station.type,
  605. currentSong: station.currentSong,
  606. startedAt: station.startedAt,
  607. paused: station.paused,
  608. timePaused: station.timePaused,
  609. pausedAt: station.pausedAt,
  610. description: station.description,
  611. displayName: station.displayName,
  612. privacy: station.privacy,
  613. locked: station.locked,
  614. partyMode: station.partyMode,
  615. owner: station.owner,
  616. privatePlaylist: station.privatePlaylist,
  617. genres: station.genres,
  618. blacklistedGenres: station.blacklistedGenres
  619. };
  620. StationsModule.userList[session.socketId] = station._id;
  621. next(null, data);
  622. },
  623. (data, next) => {
  624. data = JSON.parse(JSON.stringify(data));
  625. data.userCount = StationsModule.usersPerStationCount[data._id] || 0;
  626. data.users = StationsModule.usersPerStation[data._id] || [];
  627. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  628. IOModule.runJob("SOCKET_JOIN_SONG_ROOM", {
  629. socketId: session.socketId,
  630. room: `song.${data.currentSong.songId}`
  631. });
  632. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  633. return SongsModule.runJob(
  634. "GET_SONG_FROM_ID",
  635. {
  636. songId: data.currentSong.songId
  637. },
  638. this
  639. )
  640. .then(response => {
  641. const { song } = response;
  642. if (song) {
  643. data.currentSong.likes = song.likes;
  644. data.currentSong.dislikes = song.dislikes;
  645. } else {
  646. data.currentSong.likes = -1;
  647. data.currentSong.dislikes = -1;
  648. }
  649. })
  650. .catch(() => {
  651. data.currentSong.likes = -1;
  652. data.currentSong.dislikes = -1;
  653. })
  654. .finally(() => next(null, data));
  655. },
  656. (data, next) =>
  657. StationsModule.runJob(
  658. "HAS_USER_FAVORITED_STATION",
  659. {
  660. userId: session.userId,
  661. stationId: data._id
  662. },
  663. this
  664. )
  665. .then(isStationFavorited => {
  666. data.isFavorited = isStationFavorited;
  667. return next(null, data);
  668. })
  669. .catch(err => next(err))
  670. ],
  671. async (err, data) => {
  672. if (err) {
  673. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  674. this.log("ERROR", "STATIONS_JOIN", `Joining station "${stationName}" failed. "${err}"`);
  675. return cb({ status: "failure", message: err });
  676. }
  677. this.log("SUCCESS", "STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  678. return cb({ status: "success", data });
  679. }
  680. );
  681. },
  682. /**
  683. * Toggles if a station is locked
  684. *
  685. * @param session
  686. * @param stationId - the station id
  687. * @param cb
  688. */
  689. toggleLock: isOwnerRequired(async function toggleLock(session, stationId, cb) {
  690. const stationModel = await DBModule.runJob(
  691. "GET_MODEL",
  692. {
  693. modelName: "station"
  694. },
  695. this
  696. );
  697. async.waterfall(
  698. [
  699. next => {
  700. StationsModule.runJob("GET_STATION", { stationId }, this)
  701. .then(station => {
  702. next(null, station);
  703. })
  704. .catch(next);
  705. },
  706. (station, next) => {
  707. stationModel.updateOne({ _id: stationId }, { $set: { locked: !station.locked } }, next);
  708. },
  709. (res, next) => {
  710. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  711. .then(station => {
  712. next(null, station);
  713. })
  714. .catch(next);
  715. }
  716. ],
  717. async (err, station) => {
  718. if (err) {
  719. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  720. this.log(
  721. "ERROR",
  722. "STATIONS_UPDATE_LOCKED_STATUS",
  723. `Toggling the queue lock for station "${stationId}" failed. "${err}"`
  724. );
  725. return cb({ status: "failure", message: err });
  726. }
  727. this.log(
  728. "SUCCESS",
  729. "STATIONS_UPDATE_LOCKED_STATUS",
  730. `Toggled the queue lock for station "${stationId}" successfully to "${station.locked}".`
  731. );
  732. CacheModule.runJob("PUB", {
  733. channel: "station.queueLockToggled",
  734. value: {
  735. stationId,
  736. locked: station.locked
  737. }
  738. });
  739. return cb({ status: "success", data: station.locked });
  740. }
  741. );
  742. }),
  743. /**
  744. * Votes to skip a station
  745. *
  746. * @param session
  747. * @param stationId - the station id
  748. * @param cb
  749. */
  750. voteSkip: isLoginRequired(async function voteSkip(session, stationId, cb) {
  751. const stationModel = await DBModule.runJob(
  752. "GET_MODEL",
  753. {
  754. modelName: "station"
  755. },
  756. this
  757. );
  758. let skipVotes = 0;
  759. let shouldSkip = false;
  760. async.waterfall(
  761. [
  762. next => {
  763. StationsModule.runJob("GET_STATION", { stationId }, this)
  764. .then(station => {
  765. next(null, station);
  766. })
  767. .catch(next);
  768. },
  769. (station, next) => {
  770. if (!station) return next("Station not found.");
  771. return StationsModule.runJob(
  772. "CAN_USER_VIEW_STATION",
  773. {
  774. station,
  775. userId: session.userId
  776. },
  777. this
  778. )
  779. .then(canView => {
  780. if (canView) return next(null, station);
  781. return next("Insufficient permissions.");
  782. })
  783. .catch(err => next(err));
  784. },
  785. (station, next) => {
  786. if (!station.currentSong) return next("There is currently no song to skip.");
  787. if (station.currentSong.skipVotes.indexOf(session.userId) !== -1)
  788. return next("You have already voted to skip this song.");
  789. return next(null, station);
  790. },
  791. (station, next) => {
  792. stationModel.updateOne(
  793. { _id: stationId },
  794. { $push: { "currentSong.skipVotes": session.userId } },
  795. next
  796. );
  797. },
  798. (res, next) => {
  799. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  800. .then(station => {
  801. next(null, station);
  802. })
  803. .catch(next);
  804. },
  805. (station, next) => {
  806. if (!station) return next("Station not found.");
  807. return next(null, station);
  808. },
  809. (station, next) => {
  810. skipVotes = station.currentSong.skipVotes.length;
  811. IOModule.runJob(
  812. "GET_ROOM_SOCKETS",
  813. {
  814. room: `station.${stationId}`
  815. },
  816. this
  817. )
  818. .then(sockets => {
  819. next(null, sockets);
  820. })
  821. .catch(next);
  822. },
  823. (sockets, next) => {
  824. if (sockets.length <= skipVotes) shouldSkip = true;
  825. next();
  826. }
  827. ],
  828. async err => {
  829. if (err) {
  830. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  831. this.log("ERROR", "STATIONS_VOTE_SKIP", `Vote skipping station "${stationId}" failed. "${err}"`);
  832. return cb({ status: "failure", message: err });
  833. }
  834. this.log("SUCCESS", "STATIONS_VOTE_SKIP", `Vote skipping "${stationId}" successful.`);
  835. CacheModule.runJob("PUB", {
  836. channel: "station.voteSkipSong",
  837. value: stationId
  838. });
  839. if (shouldSkip) StationsModule.runJob("SKIP_STATION", { stationId });
  840. return cb({
  841. status: "success",
  842. message: "Successfully voted to skip the song."
  843. });
  844. }
  845. );
  846. }),
  847. /**
  848. * Force skips a station
  849. *
  850. * @param session
  851. * @param stationId - the station id
  852. * @param cb
  853. */
  854. forceSkip: isOwnerRequired(function forceSkip(session, stationId, cb) {
  855. async.waterfall(
  856. [
  857. next => {
  858. StationsModule.runJob("GET_STATION", { stationId }, this)
  859. .then(station => {
  860. next(null, station);
  861. })
  862. .catch(next);
  863. },
  864. (station, next) => {
  865. if (!station) return next("Station not found.");
  866. return next();
  867. }
  868. ],
  869. async err => {
  870. if (err) {
  871. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  872. this.log("ERROR", "STATIONS_FORCE_SKIP", `Force skipping station "${stationId}" failed. "${err}"`);
  873. return cb({ status: "failure", message: err });
  874. }
  875. StationsModule.runJob("SKIP_STATION", { stationId });
  876. this.log("SUCCESS", "STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  877. return cb({
  878. status: "success",
  879. message: "Successfully skipped station."
  880. });
  881. }
  882. );
  883. }),
  884. /**
  885. * Leaves the user's current station
  886. *
  887. * @param {object} session - user session
  888. * @param {string} stationId - id of station to leave
  889. * @param {Function} cb - callback
  890. */
  891. leave(session, stationId, cb) {
  892. async.waterfall(
  893. [
  894. next => {
  895. StationsModule.runJob("GET_STATION", { stationId }, this)
  896. .then(station => {
  897. next(null, station);
  898. })
  899. .catch(next);
  900. },
  901. (station, next) => {
  902. if (!station) return next("Station not found.");
  903. return next();
  904. }
  905. ],
  906. async (err, userCount) => {
  907. if (err) {
  908. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  909. this.log("ERROR", "STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  910. return cb({ status: "failure", message: err });
  911. }
  912. this.log("SUCCESS", "STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  913. IOModule.runJob("SOCKET_LEAVE_ROOMS", { socketId: session });
  914. delete StationsModule.userList[session.socketId];
  915. return cb({
  916. status: "success",
  917. message: "Successfully left station.",
  918. userCount
  919. });
  920. }
  921. );
  922. },
  923. /**
  924. * Updates a station's name
  925. *
  926. * @param session
  927. * @param stationId - the station id
  928. * @param newName - the new station name
  929. * @param cb
  930. */
  931. updateName: isOwnerRequired(async function updateName(session, stationId, newName, cb) {
  932. const stationModel = await DBModule.runJob(
  933. "GET_MODEL",
  934. {
  935. modelName: "station"
  936. },
  937. this
  938. );
  939. async.waterfall(
  940. [
  941. next => {
  942. stationModel.updateOne(
  943. { _id: stationId },
  944. { $set: { name: newName } },
  945. { runValidators: true },
  946. next
  947. );
  948. },
  949. (res, next) => {
  950. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  951. .then(station => {
  952. next(null, station);
  953. })
  954. .catch(next);
  955. }
  956. ],
  957. async err => {
  958. if (err) {
  959. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  960. this.log(
  961. "ERROR",
  962. "STATIONS_UPDATE_NAME",
  963. `Updating station "${stationId}" name to "${newName}" failed. "${err}"`
  964. );
  965. return cb({ status: "failure", message: err });
  966. }
  967. this.log(
  968. "SUCCESS",
  969. "STATIONS_UPDATE_NAME",
  970. `Updated station "${stationId}" name to "${newName}" successfully.`
  971. );
  972. CacheModule.runJob("PUB", {
  973. channel: "station.nameUpdate",
  974. value: { stationId }
  975. });
  976. return cb({
  977. status: "success",
  978. message: "Successfully updated the name."
  979. });
  980. }
  981. );
  982. }),
  983. /**
  984. * Updates a station's display name
  985. *
  986. * @param session
  987. * @param stationId - the station id
  988. * @param newDisplayName - the new station display name
  989. * @param cb
  990. */
  991. updateDisplayName: isOwnerRequired(async function updateDisplayName(session, stationId, newDisplayName, cb) {
  992. const stationModel = await DBModule.runJob(
  993. "GET_MODEL",
  994. {
  995. modelName: "station"
  996. },
  997. this
  998. );
  999. async.waterfall(
  1000. [
  1001. next => {
  1002. stationModel.updateOne(
  1003. { _id: stationId },
  1004. { $set: { displayName: newDisplayName } },
  1005. { runValidators: true },
  1006. next
  1007. );
  1008. },
  1009. (res, next) => {
  1010. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1011. .then(station => {
  1012. next(null, station);
  1013. })
  1014. .catch(next);
  1015. }
  1016. ],
  1017. async err => {
  1018. if (err) {
  1019. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1020. this.log(
  1021. "ERROR",
  1022. "STATIONS_UPDATE_DISPLAY_NAME",
  1023. `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`
  1024. );
  1025. return cb({ status: "failure", message: err });
  1026. }
  1027. this.log(
  1028. "SUCCESS",
  1029. "STATIONS_UPDATE_DISPLAY_NAME",
  1030. `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`
  1031. );
  1032. CacheModule.runJob("PUB", {
  1033. channel: "station.displayNameUpdate",
  1034. value: { stationId }
  1035. });
  1036. return cb({
  1037. status: "success",
  1038. message: "Successfully updated the display name."
  1039. });
  1040. }
  1041. );
  1042. }),
  1043. /**
  1044. * Updates a station's description
  1045. *
  1046. * @param session
  1047. * @param stationId - the station id
  1048. * @param newDescription - the new station description
  1049. * @param cb
  1050. */
  1051. updateDescription: isOwnerRequired(async function updateDescription(session, stationId, newDescription, cb) {
  1052. const stationModel = await DBModule.runJob(
  1053. "GET_MODEL",
  1054. {
  1055. modelName: "station"
  1056. },
  1057. this
  1058. );
  1059. async.waterfall(
  1060. [
  1061. next => {
  1062. stationModel.updateOne(
  1063. { _id: stationId },
  1064. { $set: { description: newDescription } },
  1065. { runValidators: true },
  1066. next
  1067. );
  1068. },
  1069. (res, next) => {
  1070. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1071. .then(station => {
  1072. next(null, station);
  1073. })
  1074. .catch(next);
  1075. }
  1076. ],
  1077. async err => {
  1078. if (err) {
  1079. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1080. this.log(
  1081. "ERROR",
  1082. "STATIONS_UPDATE_DESCRIPTION",
  1083. `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`
  1084. );
  1085. return cb({ status: "failure", message: err });
  1086. }
  1087. this.log(
  1088. "SUCCESS",
  1089. "STATIONS_UPDATE_DESCRIPTION",
  1090. `Updated station "${stationId}" description to "${newDescription}" successfully.`
  1091. );
  1092. CacheModule.runJob("PUB", {
  1093. channel: "station.descriptionUpdate",
  1094. value: { stationId }
  1095. });
  1096. return cb({
  1097. status: "success",
  1098. message: "Successfully updated the description."
  1099. });
  1100. }
  1101. );
  1102. }),
  1103. /**
  1104. * Updates a station's privacy
  1105. *
  1106. * @param session
  1107. * @param stationId - the station id
  1108. * @param newPrivacy - the new station privacy
  1109. * @param cb
  1110. */
  1111. updatePrivacy: isOwnerRequired(async function updatePrivacy(session, stationId, newPrivacy, cb) {
  1112. const stationModel = await DBModule.runJob(
  1113. "GET_MODEL",
  1114. {
  1115. modelName: "station"
  1116. },
  1117. this
  1118. );
  1119. let previousPrivacy = null;
  1120. async.waterfall(
  1121. [
  1122. next => {
  1123. stationModel.findOne({ _id: stationId }, next);
  1124. },
  1125. (station, next) => {
  1126. if (!station) next("No station found.");
  1127. else {
  1128. previousPrivacy = station.privacy;
  1129. next();
  1130. }
  1131. },
  1132. next => {
  1133. stationModel.updateOne(
  1134. { _id: stationId },
  1135. { $set: { privacy: newPrivacy } },
  1136. { runValidators: true },
  1137. next
  1138. );
  1139. },
  1140. (res, next) => {
  1141. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1142. .then(station => {
  1143. next(null, station);
  1144. })
  1145. .catch(next);
  1146. }
  1147. ],
  1148. async err => {
  1149. if (err) {
  1150. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1151. this.log(
  1152. "ERROR",
  1153. "STATIONS_UPDATE_PRIVACY",
  1154. `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`
  1155. );
  1156. return cb({ status: "failure", message: err });
  1157. }
  1158. this.log(
  1159. "SUCCESS",
  1160. "STATIONS_UPDATE_PRIVACY",
  1161. `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`
  1162. );
  1163. CacheModule.runJob("PUB", {
  1164. channel: "station.privacyUpdate",
  1165. value: { stationId, previousPrivacy }
  1166. });
  1167. return cb({
  1168. status: "success",
  1169. message: "Successfully updated the privacy."
  1170. });
  1171. }
  1172. );
  1173. }),
  1174. /**
  1175. * Updates a station's genres
  1176. *
  1177. * @param session
  1178. * @param stationId - the station id
  1179. * @param newGenres - the new station genres
  1180. * @param cb
  1181. */
  1182. updateGenres: isOwnerRequired(async function updateGenres(session, stationId, newGenres, cb) {
  1183. const stationModel = await DBModule.runJob(
  1184. "GET_MODEL",
  1185. {
  1186. modelName: "station"
  1187. },
  1188. this
  1189. );
  1190. async.waterfall(
  1191. [
  1192. next => {
  1193. stationModel.updateOne(
  1194. { _id: stationId },
  1195. { $set: { genres: newGenres } },
  1196. { runValidators: true },
  1197. next
  1198. );
  1199. },
  1200. (res, next) => {
  1201. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1202. .then(station => {
  1203. next(null, station);
  1204. })
  1205. .catch(next);
  1206. }
  1207. ],
  1208. async err => {
  1209. if (err) {
  1210. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1211. this.log(
  1212. "ERROR",
  1213. "STATIONS_UPDATE_GENRES",
  1214. `Updating station "${stationId}" genres to "${newGenres}" failed. "${err}"`
  1215. );
  1216. return cb({ status: "failure", message: err });
  1217. }
  1218. this.log(
  1219. "SUCCESS",
  1220. "STATIONS_UPDATE_GENRES",
  1221. `Updated station "${stationId}" genres to "${newGenres}" successfully.`
  1222. );
  1223. return cb({
  1224. status: "success",
  1225. message: "Successfully updated the genres."
  1226. });
  1227. }
  1228. );
  1229. }),
  1230. /**
  1231. * Updates a station's blacklisted genres
  1232. *
  1233. * @param session
  1234. * @param stationId - the station id
  1235. * @param newBlacklistedGenres - the new station blacklisted genres
  1236. * @param cb
  1237. */
  1238. updateBlacklistedGenres: isOwnerRequired(async function updateBlacklistedGenres(
  1239. session,
  1240. stationId,
  1241. newBlacklistedGenres,
  1242. cb
  1243. ) {
  1244. const stationModel = await DBModule.runJob(
  1245. "GET_MODEL",
  1246. {
  1247. modelName: "station"
  1248. },
  1249. this
  1250. );
  1251. async.waterfall(
  1252. [
  1253. next => {
  1254. stationModel.updateOne(
  1255. { _id: stationId },
  1256. {
  1257. $set: {
  1258. blacklistedGenres: newBlacklistedGenres
  1259. }
  1260. },
  1261. { runValidators: true },
  1262. next
  1263. );
  1264. },
  1265. (res, next) => {
  1266. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1267. .then(station => {
  1268. next(null, station);
  1269. })
  1270. .catch(next);
  1271. }
  1272. ],
  1273. async err => {
  1274. if (err) {
  1275. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1276. this.log(
  1277. "ERROR",
  1278. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1279. `Updating station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" failed. "${err}"`
  1280. );
  1281. return cb({ status: "failure", message: err });
  1282. }
  1283. this.log(
  1284. "SUCCESS",
  1285. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1286. `Updated station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" successfully.`
  1287. );
  1288. return cb({
  1289. status: "success",
  1290. message: "Successfully updated the blacklisted genres."
  1291. });
  1292. }
  1293. );
  1294. }),
  1295. /**
  1296. * Updates a station's party mode
  1297. *
  1298. * @param session
  1299. * @param stationId - the station id
  1300. * @param newPartyMode - the new station party mode
  1301. * @param cb
  1302. */
  1303. updatePartyMode: isOwnerRequired(async function updatePartyMode(session, stationId, newPartyMode, cb) {
  1304. const stationModel = await DBModule.runJob(
  1305. "GET_MODEL",
  1306. {
  1307. modelName: "station"
  1308. },
  1309. this
  1310. );
  1311. async.waterfall(
  1312. [
  1313. next => {
  1314. StationsModule.runJob("GET_STATION", { stationId }, this)
  1315. .then(station => {
  1316. next(null, station);
  1317. })
  1318. .catch(next);
  1319. },
  1320. (station, next) => {
  1321. if (!station) return next("Station not found.");
  1322. if (station.partyMode === newPartyMode)
  1323. return next(`The party mode was already ${newPartyMode ? "enabled." : "disabled."}`);
  1324. return stationModel.updateOne(
  1325. { _id: stationId },
  1326. { $set: { partyMode: newPartyMode } },
  1327. { runValidators: true },
  1328. next
  1329. );
  1330. },
  1331. (res, next) => {
  1332. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1333. .then(station => {
  1334. next(null, station);
  1335. })
  1336. .catch(next);
  1337. }
  1338. ],
  1339. async err => {
  1340. if (err) {
  1341. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1342. this.log(
  1343. "ERROR",
  1344. "STATIONS_UPDATE_PARTY_MODE",
  1345. `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`
  1346. );
  1347. return cb({ status: "failure", message: err });
  1348. }
  1349. this.log(
  1350. "SUCCESS",
  1351. "STATIONS_UPDATE_PARTY_MODE",
  1352. `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`
  1353. );
  1354. CacheModule.runJob("PUB", {
  1355. channel: "station.updatePartyMode",
  1356. value: {
  1357. stationId,
  1358. partyMode: newPartyMode
  1359. }
  1360. });
  1361. StationsModule.runJob("SKIP_STATION", { stationId });
  1362. return cb({
  1363. status: "success",
  1364. message: "Successfully updated the party mode."
  1365. });
  1366. }
  1367. );
  1368. }),
  1369. /**
  1370. * Pauses a station
  1371. *
  1372. * @param session
  1373. * @param stationId - the station id
  1374. * @param cb
  1375. */
  1376. pause: isOwnerRequired(async function pause(session, stationId, cb) {
  1377. const stationModel = await DBModule.runJob(
  1378. "GET_MODEL",
  1379. {
  1380. modelName: "station"
  1381. },
  1382. this
  1383. );
  1384. async.waterfall(
  1385. [
  1386. next => {
  1387. StationsModule.runJob("GET_STATION", { stationId }, this)
  1388. .then(station => {
  1389. next(null, station);
  1390. })
  1391. .catch(next);
  1392. },
  1393. (station, next) => {
  1394. if (!station) return next("Station not found.");
  1395. if (station.paused) return next("That station was already paused.");
  1396. return stationModel.updateOne(
  1397. { _id: stationId },
  1398. { $set: { paused: true, pausedAt: Date.now() } },
  1399. next
  1400. );
  1401. },
  1402. (res, next) => {
  1403. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1404. .then(station => {
  1405. next(null, station);
  1406. })
  1407. .catch(next);
  1408. }
  1409. ],
  1410. async err => {
  1411. if (err) {
  1412. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1413. this.log("ERROR", "STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  1414. return cb({ status: "failure", message: err });
  1415. }
  1416. this.log("SUCCESS", "STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  1417. CacheModule.runJob("PUB", {
  1418. channel: "station.pause",
  1419. value: stationId
  1420. });
  1421. NotificationsModule.runJob("UNSCHEDULE", {
  1422. name: `stations.nextSong?id=${stationId}`
  1423. });
  1424. return cb({
  1425. status: "success",
  1426. message: "Successfully paused."
  1427. });
  1428. }
  1429. );
  1430. }),
  1431. /**
  1432. * Resumes a station
  1433. *
  1434. * @param session
  1435. * @param stationId - the station id
  1436. * @param cb
  1437. */
  1438. resume: isOwnerRequired(async function resume(session, stationId, cb) {
  1439. const stationModel = await DBModule.runJob(
  1440. "GET_MODEL",
  1441. {
  1442. modelName: "station"
  1443. },
  1444. this
  1445. );
  1446. async.waterfall(
  1447. [
  1448. next => {
  1449. StationsModule.runJob("GET_STATION", { stationId }, this)
  1450. .then(station => {
  1451. next(null, station);
  1452. })
  1453. .catch(next);
  1454. },
  1455. (station, next) => {
  1456. if (!station) return next("Station not found.");
  1457. if (!station.paused) return next("That station is not paused.");
  1458. station.timePaused += Date.now() - station.pausedAt;
  1459. return stationModel.updateOne(
  1460. { _id: stationId },
  1461. {
  1462. $set: { paused: false },
  1463. $inc: { timePaused: Date.now() - station.pausedAt }
  1464. },
  1465. next
  1466. );
  1467. },
  1468. (res, next) => {
  1469. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1470. .then(station => {
  1471. next(null, station);
  1472. })
  1473. .catch(next);
  1474. }
  1475. ],
  1476. async err => {
  1477. if (err) {
  1478. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1479. this.log("ERROR", "STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  1480. return cb({ status: "failure", message: err });
  1481. }
  1482. this.log("SUCCESS", "STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  1483. CacheModule.runJob("PUB", {
  1484. channel: "station.resume",
  1485. value: stationId
  1486. });
  1487. return cb({
  1488. status: "success",
  1489. message: "Successfully resumed."
  1490. });
  1491. }
  1492. );
  1493. }),
  1494. /**
  1495. * Removes a station
  1496. *
  1497. * @param session
  1498. * @param stationId - the station id
  1499. * @param cb
  1500. */
  1501. remove: isOwnerRequired(async function remove(session, stationId, cb) {
  1502. const stationModel = await DBModule.runJob(
  1503. "GET_MODEL",
  1504. {
  1505. modelName: "station"
  1506. },
  1507. this
  1508. );
  1509. async.waterfall(
  1510. [
  1511. next => {
  1512. stationModel.deleteOne({ _id: stationId }, err => next(err));
  1513. },
  1514. next => {
  1515. CacheModule.runJob("HDEL", { table: "stations", key: stationId }, this).then(next).catch(next);
  1516. }
  1517. ],
  1518. async err => {
  1519. if (err) {
  1520. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1521. this.log("ERROR", "STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  1522. return cb({ status: "failure", message: err });
  1523. }
  1524. this.log("SUCCESS", "STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  1525. CacheModule.runJob("PUB", {
  1526. channel: "station.remove",
  1527. value: stationId
  1528. });
  1529. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1530. userId: session.userId,
  1531. activityType: "deleted_station",
  1532. payload: [stationId]
  1533. });
  1534. return cb({
  1535. status: "success",
  1536. message: "Successfully removed."
  1537. });
  1538. }
  1539. );
  1540. }),
  1541. /**
  1542. * Create a station
  1543. *
  1544. * @param session
  1545. * @param data - the station data
  1546. * @param cb
  1547. */
  1548. create: isLoginRequired(async function create(session, data, cb) {
  1549. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  1550. const stationModel = await DBModule.runJob(
  1551. "GET_MODEL",
  1552. {
  1553. modelName: "station"
  1554. },
  1555. this
  1556. );
  1557. data.name = data.name.toLowerCase();
  1558. const blacklist = [
  1559. "country",
  1560. "edm",
  1561. "musare",
  1562. "hip-hop",
  1563. "rap",
  1564. "top-hits",
  1565. "todays-hits",
  1566. "old-school",
  1567. "christmas",
  1568. "about",
  1569. "support",
  1570. "staff",
  1571. "help",
  1572. "news",
  1573. "terms",
  1574. "privacy",
  1575. "profile",
  1576. "c",
  1577. "community",
  1578. "tos",
  1579. "login",
  1580. "register",
  1581. "p",
  1582. "official",
  1583. "o",
  1584. "trap",
  1585. "faq",
  1586. "team",
  1587. "donate",
  1588. "buy",
  1589. "shop",
  1590. "forums",
  1591. "explore",
  1592. "settings",
  1593. "admin",
  1594. "auth",
  1595. "reset_password"
  1596. ];
  1597. async.waterfall(
  1598. [
  1599. next => {
  1600. if (!data) return next("Invalid data.");
  1601. return next();
  1602. },
  1603. next => {
  1604. stationModel.findOne(
  1605. {
  1606. $or: [
  1607. { name: data.name },
  1608. {
  1609. displayName: new RegExp(`^${data.displayName}$`, "i")
  1610. }
  1611. ]
  1612. },
  1613. next
  1614. );
  1615. },
  1616. // eslint-disable-next-line consistent-return
  1617. (station, next) => {
  1618. this.log(station);
  1619. if (station) return next("A station with that name or display name already exists.");
  1620. const { name, displayName, description, genres, playlist, type, blacklistedGenres } = data;
  1621. if (type === "official") {
  1622. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1623. if (err) return next(err);
  1624. if (!user) return next("User not found.");
  1625. if (user.role !== "admin") return next("Admin required.");
  1626. return stationModel.create(
  1627. {
  1628. name,
  1629. displayName,
  1630. description,
  1631. type,
  1632. privacy: "private",
  1633. playlist,
  1634. genres,
  1635. blacklistedGenres,
  1636. currentSong: StationsModule.defaultSong
  1637. },
  1638. next
  1639. );
  1640. });
  1641. }
  1642. if (type === "community") {
  1643. if (blacklist.indexOf(name) !== -1)
  1644. return next("That name is blacklisted. Please use a different name.");
  1645. return stationModel.create(
  1646. {
  1647. name,
  1648. displayName,
  1649. description,
  1650. type,
  1651. privacy: "private",
  1652. owner: session.userId,
  1653. queue: [],
  1654. currentSong: null
  1655. },
  1656. next
  1657. );
  1658. }
  1659. }
  1660. ],
  1661. async (err, station) => {
  1662. if (err) {
  1663. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1664. this.log("ERROR", "STATIONS_CREATE", `Creating station failed. "${err}"`);
  1665. return cb({ status: "failure", message: err });
  1666. }
  1667. this.log("SUCCESS", "STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  1668. CacheModule.runJob("PUB", {
  1669. channel: "station.create",
  1670. value: station._id
  1671. });
  1672. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1673. userId: session.userId,
  1674. activityType: "created_station",
  1675. payload: [station._id]
  1676. });
  1677. return cb({
  1678. status: "success",
  1679. message: "Successfully created station."
  1680. });
  1681. }
  1682. );
  1683. }),
  1684. /**
  1685. * Adds song to station queue
  1686. *
  1687. * @param session
  1688. * @param stationId - the station id
  1689. * @param songId - the song id
  1690. * @param cb
  1691. */
  1692. addToQueue: isLoginRequired(async function addToQueue(session, stationId, songId, cb) {
  1693. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  1694. const stationModel = await DBModule.runJob(
  1695. "GET_MODEL",
  1696. {
  1697. modelName: "station"
  1698. },
  1699. this
  1700. );
  1701. async.waterfall(
  1702. [
  1703. next => {
  1704. StationsModule.runJob("GET_STATION", { stationId }, this)
  1705. .then(station => {
  1706. next(null, station);
  1707. })
  1708. .catch(next);
  1709. },
  1710. (station, next) => {
  1711. if (!station) return next("Station not found.");
  1712. if (station.locked) {
  1713. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1714. if (user.role !== "admin" && station.owner !== session.userId)
  1715. return next("Only owners and admins can add songs to a locked queue.");
  1716. return next(null, station);
  1717. });
  1718. }
  1719. return next(null, station);
  1720. },
  1721. (station, next) => {
  1722. if (station.type !== "community") return next("That station is not a community station.");
  1723. return StationsModule.runJob(
  1724. "CAN_USER_VIEW_STATION",
  1725. {
  1726. station,
  1727. userId: session.userId
  1728. },
  1729. this
  1730. )
  1731. .then(canView => {
  1732. if (canView) return next(null, station);
  1733. return next("Insufficient permissions.");
  1734. })
  1735. .catch(err => next(err));
  1736. },
  1737. (station, next) => {
  1738. if (station.currentSong && station.currentSong.songId === songId)
  1739. return next("That song is currently playing.");
  1740. return async.each(
  1741. station.queue,
  1742. (queueSong, next) => {
  1743. if (queueSong.songId === songId) return next("That song is already in the queue.");
  1744. return next();
  1745. },
  1746. err => next(err, station)
  1747. );
  1748. },
  1749. (station, next) => {
  1750. SongsModule.runJob("GET_SONG_FROM_ID", { songId }, this)
  1751. .then(res => {
  1752. if (res.song) return next(null, res.song, station);
  1753. return YouTubeModule.runJob("GET_SONG", { songId }, this)
  1754. .then(response => {
  1755. const { song } = response;
  1756. song.artists = [];
  1757. song.skipDuration = 0;
  1758. song.likes = -1;
  1759. song.dislikes = -1;
  1760. song.thumbnail = "empty";
  1761. song.explicit = false;
  1762. return next(null, song, station);
  1763. })
  1764. .catch(err => next(err));
  1765. })
  1766. .catch(err => next(err));
  1767. },
  1768. (song, station, next) => {
  1769. const { queue } = station;
  1770. song.requestedBy = session.userId;
  1771. song.requestedAt = Date.now();
  1772. queue.push(song);
  1773. let totalDuration = 0;
  1774. queue.forEach(song => {
  1775. totalDuration += song.duration;
  1776. });
  1777. if (totalDuration >= 3600 * 3) return next("The max length of the queue is 3 hours.");
  1778. return next(null, song, station);
  1779. },
  1780. (song, station, next) => {
  1781. const { queue } = station;
  1782. if (queue.length === 0) return next(null, song, station);
  1783. let totalDuration = 0;
  1784. const userId = queue[queue.length - 1].requestedBy;
  1785. station.queue.forEach(song => {
  1786. if (userId === song.requestedBy) {
  1787. totalDuration += song.duration;
  1788. }
  1789. });
  1790. if (totalDuration >= 900) return next("The max length of songs per user is 15 minutes.");
  1791. return next(null, song, station);
  1792. },
  1793. (song, station, next) => {
  1794. const { queue } = station;
  1795. if (queue.length === 0) return next(null, song);
  1796. let totalSongs = 0;
  1797. const userId = queue[queue.length - 1].requestedBy;
  1798. queue.forEach(song => {
  1799. if (userId === song.requestedBy) {
  1800. totalSongs += 1;
  1801. }
  1802. });
  1803. if (totalSongs <= 2) return next(null, song);
  1804. if (totalSongs > 3)
  1805. return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1806. if (queue[queue.length - 2].requestedBy !== userId || queue[queue.length - 3] !== userId)
  1807. return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1808. return next(null, song);
  1809. },
  1810. (song, next) => {
  1811. stationModel.updateOne(
  1812. { _id: stationId },
  1813. { $push: { queue: song } },
  1814. { runValidators: true },
  1815. next
  1816. );
  1817. },
  1818. (res, next) => {
  1819. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1820. .then(station => {
  1821. next(null, station);
  1822. })
  1823. .catch(next);
  1824. }
  1825. ],
  1826. async err => {
  1827. if (err) {
  1828. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1829. this.log(
  1830. "ERROR",
  1831. "STATIONS_ADD_SONG_TO_QUEUE",
  1832. `Adding song "${songId}" to station "${stationId}" queue failed. "${err}"`
  1833. );
  1834. return cb({ status: "failure", message: err });
  1835. }
  1836. this.log(
  1837. "SUCCESS",
  1838. "STATIONS_ADD_SONG_TO_QUEUE",
  1839. `Added song "${songId}" to station "${stationId}" successfully.`
  1840. );
  1841. CacheModule.runJob("PUB", {
  1842. channel: "station.queueUpdate",
  1843. value: stationId
  1844. });
  1845. return cb({
  1846. status: "success",
  1847. message: "Successfully added song to queue."
  1848. });
  1849. }
  1850. );
  1851. }),
  1852. /**
  1853. * Removes song from station queue
  1854. *
  1855. * @param session
  1856. * @param stationId - the station id
  1857. * @param songId - the song id
  1858. * @param cb
  1859. */
  1860. removeFromQueue: isOwnerRequired(async function removeFromQueue(session, stationId, songId, cb) {
  1861. const stationModel = await DBModule.runJob(
  1862. "GET_MODEL",
  1863. {
  1864. modelName: "station"
  1865. },
  1866. this
  1867. );
  1868. async.waterfall(
  1869. [
  1870. next => {
  1871. if (!songId) return next("Invalid song id.");
  1872. return StationsModule.runJob("GET_STATION", { stationId }, this)
  1873. .then(station => {
  1874. next(null, station);
  1875. })
  1876. .catch(next);
  1877. },
  1878. (station, next) => {
  1879. if (!station) return next("Station not found.");
  1880. if (station.type !== "community") return next("Station is not a community station.");
  1881. return async.each(
  1882. station.queue,
  1883. (queueSong, next) => {
  1884. if (queueSong.songId === songId) return next(true);
  1885. return next();
  1886. },
  1887. err => {
  1888. if (err === true) return next();
  1889. return next("Song is not currently in the queue.");
  1890. }
  1891. );
  1892. },
  1893. next => {
  1894. stationModel.updateOne({ _id: stationId }, { $pull: { queue: { songId } } }, next);
  1895. },
  1896. (res, next) => {
  1897. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1898. .then(station => {
  1899. next(null, station);
  1900. })
  1901. .catch(next);
  1902. }
  1903. ],
  1904. async err => {
  1905. if (err) {
  1906. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1907. this.log(
  1908. "ERROR",
  1909. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1910. `Removing song "${songId}" from station "${stationId}" queue failed. "${err}"`
  1911. );
  1912. return cb({ status: "failure", message: err });
  1913. }
  1914. this.log(
  1915. "SUCCESS",
  1916. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1917. `Removed song "${songId}" from station "${stationId}" successfully.`
  1918. );
  1919. CacheModule.runJob("PUB", {
  1920. channel: "station.queueUpdate",
  1921. value: stationId
  1922. });
  1923. return cb({
  1924. status: "success",
  1925. message: "Successfully removed song from queue."
  1926. });
  1927. }
  1928. );
  1929. }),
  1930. /**
  1931. * Gets the queue from a station
  1932. *
  1933. * @param {object} session - user session
  1934. * @param {string} stationId - the station id
  1935. * @param {Function} cb - callback
  1936. */
  1937. getQueue(session, stationId, cb) {
  1938. async.waterfall(
  1939. [
  1940. next => {
  1941. StationsModule.runJob("GET_STATION", { stationId }, this)
  1942. .then(station => {
  1943. next(null, station);
  1944. })
  1945. .catch(next);
  1946. },
  1947. (station, next) => {
  1948. if (!station) return next("Station not found.");
  1949. if (station.type !== "community") return next("Station is not a community station.");
  1950. return next(null, station);
  1951. },
  1952. (station, next) => {
  1953. StationsModule.runJob(
  1954. "CAN_USER_VIEW_STATION",
  1955. {
  1956. station,
  1957. userId: session.userId
  1958. },
  1959. this
  1960. )
  1961. .then(canView => {
  1962. if (canView) return next(null, station);
  1963. return next("Insufficient permissions.");
  1964. })
  1965. .catch(err => next(err));
  1966. }
  1967. ],
  1968. async (err, station) => {
  1969. if (err) {
  1970. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1971. this.log(
  1972. "ERROR",
  1973. "STATIONS_GET_QUEUE",
  1974. `Getting queue for station "${stationId}" failed. "${err}"`
  1975. );
  1976. return cb({ status: "failure", message: err });
  1977. }
  1978. this.log("SUCCESS", "STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  1979. return cb({
  1980. status: "success",
  1981. message: "Successfully got queue.",
  1982. queue: station.queue
  1983. });
  1984. }
  1985. );
  1986. },
  1987. /**
  1988. * Selects a private playlist for a station
  1989. *
  1990. * @param session
  1991. * @param stationId - the station id
  1992. * @param playlistId - the private playlist id
  1993. * @param cb
  1994. */
  1995. selectPrivatePlaylist: isOwnerRequired(async function selectPrivatePlaylist(session, stationId, playlistId, cb) {
  1996. const stationModel = await DBModule.runJob(
  1997. "GET_MODEL",
  1998. {
  1999. modelName: "station"
  2000. },
  2001. this
  2002. );
  2003. const playlistModel = await DBModule.runJob(
  2004. "GET_MODEL",
  2005. {
  2006. modelName: "playlist"
  2007. },
  2008. this
  2009. );
  2010. async.waterfall(
  2011. [
  2012. next => {
  2013. StationsModule.runJob("GET_STATION", { stationId }, this)
  2014. .then(station => {
  2015. next(null, station);
  2016. })
  2017. .catch(next);
  2018. },
  2019. (station, next) => {
  2020. if (!station) return next("Station not found.");
  2021. if (station.type !== "community") return next("Station is not a community station.");
  2022. if (station.privatePlaylist === playlistId)
  2023. return next("That private playlist is already selected.");
  2024. return playlistModel.findOne({ _id: playlistId }, next);
  2025. },
  2026. (playlist, next) => {
  2027. if (!playlist) return next("Playlist not found.");
  2028. const currentSongIndex = playlist.songs.length > 0 ? playlist.songs.length - 1 : 0;
  2029. return stationModel.updateOne(
  2030. { _id: stationId },
  2031. {
  2032. $set: {
  2033. privatePlaylist: playlistId,
  2034. currentSongIndex
  2035. }
  2036. },
  2037. { runValidators: true },
  2038. next
  2039. );
  2040. },
  2041. (res, next) => {
  2042. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2043. .then(station => {
  2044. next(null, station);
  2045. })
  2046. .catch(next);
  2047. }
  2048. ],
  2049. async (err, station) => {
  2050. if (err) {
  2051. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2052. this.log(
  2053. "ERROR",
  2054. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  2055. `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2056. );
  2057. return cb({ status: "failure", message: err });
  2058. }
  2059. this.log(
  2060. "SUCCESS",
  2061. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  2062. `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`
  2063. );
  2064. NotificationsModule.runJob("UNSCHEDULE", {
  2065. name: `stations.nextSong?id${stationId}`
  2066. });
  2067. if (!station.partyMode) StationsModule.runJob("SKIP_STATION", { stationId });
  2068. CacheModule.runJob("PUB", {
  2069. channel: "privatePlaylist.selected",
  2070. value: {
  2071. playlistId,
  2072. stationId
  2073. }
  2074. });
  2075. return cb({
  2076. status: "success",
  2077. message: "Successfully selected playlist."
  2078. });
  2079. }
  2080. );
  2081. }),
  2082. /**
  2083. * Deselects the private playlist selected in a station
  2084. *
  2085. * @param session
  2086. * @param stationId - the station id
  2087. * @param cb
  2088. */
  2089. deselectPrivatePlaylist: isOwnerRequired(async function deselectPrivatePlaylist(session, stationId, cb) {
  2090. const stationModel = await DBModule.runJob(
  2091. "GET_MODEL",
  2092. {
  2093. modelName: "station"
  2094. },
  2095. this
  2096. );
  2097. async.waterfall(
  2098. [
  2099. next => {
  2100. StationsModule.runJob("GET_STATION", { stationId }, this)
  2101. .then(station => {
  2102. next(null, station);
  2103. })
  2104. .catch(next);
  2105. },
  2106. (station, next) => {
  2107. if (!station) return next("Station not found.");
  2108. if (station.type !== "community") return next("Station is not a community station.");
  2109. if (!station.privatePlaylist) return next("No private playlist is currently selected.");
  2110. return stationModel.updateOne(
  2111. { _id: stationId },
  2112. {
  2113. $set: {
  2114. privatePlaylist: null,
  2115. currentSongIndex: 0
  2116. }
  2117. },
  2118. { runValidators: true },
  2119. next
  2120. );
  2121. },
  2122. (res, next) => {
  2123. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2124. .then(station => {
  2125. next(null, station);
  2126. })
  2127. .catch(next);
  2128. }
  2129. ],
  2130. async (err, station) => {
  2131. if (err) {
  2132. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2133. this.log(
  2134. "ERROR",
  2135. "STATIONS_DESELECT_PRIVATE_PLAYLIST",
  2136. `Deselecting private playlist for station "${stationId}" failed. "${err}"`
  2137. );
  2138. return cb({ status: "failure", message: err });
  2139. }
  2140. this.log(
  2141. "SUCCESS",
  2142. "STATIONS_DESELECT_PRIVATE_PLAYLIST",
  2143. `Deselected private playlist for station "${stationId}" successfully.`
  2144. );
  2145. NotificationsModule.runJob("UNSCHEDULE", {
  2146. name: `stations.nextSong?id${stationId}`
  2147. });
  2148. if (!station.partyMode) StationsModule.runJob("SKIP_STATION", { stationId });
  2149. CacheModule.runJob("PUB", {
  2150. channel: "privatePlaylist.deselected",
  2151. value: {
  2152. stationId
  2153. }
  2154. });
  2155. return cb({
  2156. status: "success",
  2157. message: "Successfully deselected playlist."
  2158. });
  2159. }
  2160. );
  2161. }),
  2162. favoriteStation: isLoginRequired(async function favoriteStation(session, stationId, cb) {
  2163. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2164. async.waterfall(
  2165. [
  2166. next => {
  2167. StationsModule.runJob("GET_STATION", { stationId }, this)
  2168. .then(station => {
  2169. next(null, station);
  2170. })
  2171. .catch(next);
  2172. },
  2173. (station, next) => {
  2174. if (!station) return next("Station not found.");
  2175. return StationsModule.runJob(
  2176. "CAN_USER_VIEW_STATION",
  2177. {
  2178. station,
  2179. userId: session.userId
  2180. },
  2181. this
  2182. )
  2183. .then(canView => {
  2184. if (canView) return next();
  2185. return next("Insufficient permissions.");
  2186. })
  2187. .catch(err => next(err));
  2188. },
  2189. next => {
  2190. userModel.updateOne({ _id: session.userId }, { $addToSet: { favoriteStations: stationId } }, next);
  2191. },
  2192. (res, next) => {
  2193. if (res.nModified === 0) return next("The station was already favorited.");
  2194. return next();
  2195. }
  2196. ],
  2197. async err => {
  2198. if (err) {
  2199. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2200. this.log("ERROR", "FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  2201. return cb({ status: "failure", message: err });
  2202. }
  2203. this.log("SUCCESS", "FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  2204. CacheModule.runJob("PUB", {
  2205. channel: "user.favoritedStation",
  2206. value: {
  2207. userId: session.userId,
  2208. stationId
  2209. }
  2210. });
  2211. return cb({
  2212. status: "success",
  2213. message: "Succesfully favorited station."
  2214. });
  2215. }
  2216. );
  2217. }),
  2218. unfavoriteStation: isLoginRequired(async function unfavoriteStation(session, stationId, cb) {
  2219. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2220. async.waterfall(
  2221. [
  2222. next => {
  2223. userModel.updateOne({ _id: session.userId }, { $pull: { favoriteStations: stationId } }, next);
  2224. },
  2225. (res, next) => {
  2226. if (res.nModified === 0) return next("The station wasn't favorited.");
  2227. return next();
  2228. }
  2229. ],
  2230. async err => {
  2231. if (err) {
  2232. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2233. this.log("ERROR", "UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  2234. return cb({ status: "failure", message: err });
  2235. }
  2236. this.log("SUCCESS", "UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  2237. CacheModule.runJob("PUB", {
  2238. channel: "user.unfavoritedStation",
  2239. value: {
  2240. userId: session.userId,
  2241. stationId
  2242. }
  2243. });
  2244. return cb({
  2245. status: "success",
  2246. message: "Succesfully unfavorited station."
  2247. });
  2248. }
  2249. );
  2250. })
  2251. };