stations.js 86 KB


  1. "use strict";
  2. const async = require("async"),
  3. request = require("request"),
  4. config = require("config"),
  5. _ = require("underscore")._;
  6. const hooks = require("./hooks");
  7. const db = require("../db");
  8. const cache = require("../cache");
  9. const notifications = require("../notifications");
  10. const utils = require("../utils");
  11. const stations = require("../stations");
  12. const songs = require("../songs");
  13. const activities = require("../activities");
  14. // const logger = moduleManager.modules["logger"];
  15. let userList = {};
  16. let usersPerStation = {};
  17. let usersPerStationCount = {};
  18. // Temporarily disabled until the messages in console can be limited
  19. // setInterval(async () => {
  20. // let stationsCountUpdated = [];
  21. // let stationsUpdated = [];
  22. // let oldUsersPerStation = usersPerStation;
  23. // usersPerStation = {};
  24. // let oldUsersPerStationCount = usersPerStationCount;
  25. // usersPerStationCount = {};
  26. // const userModel = await db.runJob("GET_MODEL", {
  27. // modelName: "user",
  28. // });
  29. //
  30. // async.each(
  31. // Object.keys(userList),
  32. // function(socketId, next) {
  33. // utils.runJob("SOCKET_FROM_SESSION", { socketId }).then((socket) => {
  34. // let stationId = userList[socketId];
  35. // if (
  36. // !socket ||
  37. // Object.keys(socket.rooms).indexOf(
  38. // `station.${stationId}`
  39. // ) === -1
  40. // ) {
  41. // if (stationsCountUpdated.indexOf(stationId) === -1)
  42. // stationsCountUpdated.push(stationId);
  43. // if (stationsUpdated.indexOf(stationId) === -1)
  44. // stationsUpdated.push(stationId);
  45. // delete userList[socketId];
  46. // return next();
  47. // }
  48. // if (!usersPerStationCount[stationId])
  49. // usersPerStationCount[stationId] = 0;
  50. // usersPerStationCount[stationId]++;
  51. // if (!usersPerStation[stationId])
  52. // usersPerStation[stationId] = [];
  53. // async.waterfall(
  54. // [
  55. // (next) => {
  56. // if (!socket.session || !socket.session.sessionId)
  57. // return next("No session found.");
  58. // cache
  59. // .runJob("HGET", {
  60. // table: "sessions",
  61. // key: socket.session.sessionId,
  62. // })
  63. // .then((session) => next(null, session))
  64. // .catch(next);
  65. // },
  66. // (session, next) => {
  67. // if (!session) return next("Session not found.");
  68. // userModel.findOne({ _id: session.userId }, next);
  69. // },
  70. // (user, next) => {
  71. // if (!user) return next("User not found.");
  72. // if (
  73. // usersPerStation[stationId].indexOf(
  74. // user.username
  75. // ) !== -1
  76. // )
  77. // return next("User already in the list.");
  78. // next(null, user.username);
  79. // },
  80. // ],
  81. // (err, username) => {
  82. // if (!err) {
  83. // usersPerStation[stationId].push(username);
  84. // }
  85. // next();
  86. // }
  87. // );
  88. // });
  89. // //TODO Code to show users
  90. // },
  91. // (err) => {
  92. // for (let stationId in usersPerStationCount) {
  93. // if (
  94. // oldUsersPerStationCount[stationId] !==
  95. // usersPerStationCount[stationId]
  96. // ) {
  97. // if (stationsCountUpdated.indexOf(stationId) === -1)
  98. // stationsCountUpdated.push(stationId);
  99. // }
  100. // }
  101. // for (let stationId in usersPerStation) {
  102. // if (
  103. // _.difference(
  104. // usersPerStation[stationId],
  105. // oldUsersPerStation[stationId]
  106. // ).length > 0 ||
  107. // _.difference(
  108. // oldUsersPerStation[stationId],
  109. // usersPerStation[stationId]
  110. // ).length > 0
  111. // ) {
  112. // if (stationsUpdated.indexOf(stationId) === -1)
  113. // stationsUpdated.push(stationId);
  114. // }
  115. // }
  116. // stationsCountUpdated.forEach((stationId) => {
  117. // //console.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  118. // cache.runJob("PUB", {
  119. // table: "station.updateUserCount",
  120. // value: stationId,
  121. // });
  122. // });
  123. // stationsUpdated.forEach((stationId) => {
  124. // //console.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  125. // cache.runJob("PUB", {
  126. // table: "station.updateUsers",
  127. // value: stationId,
  128. // });
  129. // });
  130. // //console.log("Userlist", usersPerStation);
  131. // }
  132. // );
  133. // }, 3000);
  134. cache.runJob("SUB", {
  135. channel: "station.updateUsers",
  136. cb: (stationId) => {
  137. let list = usersPerStation[stationId] || [];
  138. utils.runJob("EMIT_TO_ROOM", {
  139. room: `station.${stationId}`,
  140. args: ["event:users.updated", list],
  141. });
  142. },
  143. });
  144. cache.runJob("SUB", {
  145. channel: "station.updateUserCount",
  146. cb: (stationId) => {
  147. let count = usersPerStationCount[stationId] || 0;
  148. utils.runJob("EMIT_TO_ROOM", {
  149. room: `station.${stationId}`,
  150. args: ["event:userCount.updated", count],
  151. });
  152. stations.runJob("GET_STATION", { stationId }).then(async (station) => {
  153. if (station.privacy === "public")
  154. utils.runJob("EMIT_TO_ROOM", {
  155. room: "home",
  156. args: ["event:userCount.updated", stationId, count],
  157. });
  158. else {
  159. let sockets = await utils.runJob("GET_ROOM_SOCKETS", {
  160. room: "home",
  161. });
  162. for (let socketId in sockets) {
  163. let socket = sockets[socketId];
  164. let session = sockets[socketId].session;
  165. if (session.sessionId) {
  166. cache
  167. .runJob("HGET", {
  168. table: "sessions",
  169. key: session.sessionId,
  170. })
  171. .then((session) => {
  172. if (session)
  173. db.runJob("GET_MODEL", {
  174. modelName: "user",
  175. }).then((userModel) =>
  176. userModel.findOne(
  177. { _id: session.userId },
  178. (err, user) => {
  179. if (user.role === "admin")
  180. socket.emit(
  181. "event:userCount.updated",
  182. stationId,
  183. count
  184. );
  185. else if (
  186. station.type ===
  187. "community" &&
  188. station.owner ===
  189. session.userId
  190. )
  191. socket.emit(
  192. "event:userCount.updated",
  193. stationId,
  194. count
  195. );
  196. }
  197. )
  198. );
  199. });
  200. }
  201. }
  202. }
  203. });
  204. },
  205. });
  206. cache.runJob("SUB", {
  207. channel: "station.queueLockToggled",
  208. cb: (data) => {
  209. utils.runJob("EMIT_TO_ROOM", {
  210. room: `station.${data.stationId}`,
  211. args: ["event:queueLockToggled", data.locked],
  212. });
  213. },
  214. });
  215. cache.runJob("SUB", {
  216. channel: "station.updatePartyMode",
  217. cb: (data) => {
  218. utils.runJob("EMIT_TO_ROOM", {
  219. room: `station.${data.stationId}`,
  220. args: ["event:partyMode.updated", data.partyMode],
  221. });
  222. },
  223. });
  224. cache.runJob("SUB", {
  225. channel: "privatePlaylist.selected",
  226. cb: (data) => {
  227. utils.runJob("EMIT_TO_ROOM", {
  228. room: `station.${data.stationId}`,
  229. args: ["event:privatePlaylist.selected", data.playlistId],
  230. });
  231. },
  232. });
  233. cache.runJob("SUB", {
  234. channel: "station.pause",
  235. cb: (stationId) => {
  236. stations.runJob("GET_STATION", { stationId }).then((station) => {
  237. utils.runJob("EMIT_TO_ROOM", {
  238. room: `station.${stationId}`,
  239. args: ["event:stations.pause", { pausedAt: station.pausedAt }],
  240. });
  241. });
  242. },
  243. });
  244. cache.runJob("SUB", {
  245. channel: "station.resume",
  246. cb: (stationId) => {
  247. stations.runJob("GET_STATION", { stationId }).then((station) => {
  248. utils.runJob("EMIT_TO_ROOM", {
  249. room: `station.${stationId}`,
  250. args: [
  251. "event:stations.resume",
  252. { timePaused: station.timePaused },
  253. ],
  254. });
  255. });
  256. },
  257. });
  258. cache.runJob("SUB", {
  259. channel: "station.queueUpdate",
  260. cb: (stationId) => {
  261. stations.runJob("GET_STATION", { stationId }).then((station) => {
  262. utils.runJob("EMIT_TO_ROOM", {
  263. room: `station.${stationId}`,
  264. args: ["event:queue.update", station.queue],
  265. });
  266. });
  267. },
  268. });
  269. cache.runJob("SUB", {
  270. channel: "station.voteSkipSong",
  271. cb: (stationId) => {
  272. utils.runJob("EMIT_TO_ROOM", {
  273. room: `station.${stationId}`,
  274. args: ["event:song.voteSkipSong"],
  275. });
  276. },
  277. });
  278. cache.runJob("SUB", {
  279. channel: "station.remove",
  280. cb: (stationId) => {
  281. utils.runJob("EMIT_TO_ROOM", {
  282. room: `station.${stationId}`,
  283. args: ["event:stations.remove"],
  284. });
  285. utils.runJob("EMIT_TO_ROOM", {
  286. room: "admin.stations",
  287. args: ["event:admin.station.removed", stationId],
  288. });
  289. },
  290. });
  291. cache.runJob("SUB", {
  292. channel: "station.create",
  293. cb: async (stationId) => {
  294. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  295. stations
  296. .runJob("INITIALIZE_STATION", { stationId })
  297. .then(async (response) => {
  298. const station = response.station;
  299. station.userCount = usersPerStationCount[stationId] || 0;
  300. utils.runJob("EMIT_TO_ROOM", {
  301. room: "admin.stations",
  302. args: ["event:admin.station.added", station],
  303. });
  304. // TODO If community, check if on whitelist
  305. if (station.privacy === "public")
  306. utils.runJob("EMIT_TO_ROOM", {
  307. room: "home",
  308. args: ["event:stations.created", station],
  309. });
  310. else {
  311. let sockets = await utils.runJob("GET_ROOM_SOCKETS", {
  312. room: "home",
  313. });
  314. for (let socketId in sockets) {
  315. let socket = sockets[socketId];
  316. let session = sockets[socketId].session;
  317. if (session.sessionId) {
  318. cache
  319. .runJob("HGET", {
  320. table: "sessions",
  321. key: session.sessionId,
  322. })
  323. .then((session) => {
  324. if (session) {
  325. userModel.findOne(
  326. { _id: session.userId },
  327. (err, user) => {
  328. if (user.role === "admin")
  329. socket.emit(
  330. "event:stations.created",
  331. station
  332. );
  333. else if (
  334. station.type ===
  335. "community" &&
  336. station.owner ===
  337. session.userId
  338. )
  339. socket.emit(
  340. "event:stations.created",
  341. station
  342. );
  343. }
  344. );
  345. }
  346. });
  347. }
  348. }
  349. }
  350. });
  351. },
  352. });
  353. module.exports = {
  354. /**
  355. * Get a list of all the stations
  356. *
  357. * @param session
  358. * @param cb
  359. * @return {{ status: String, stations: Array }}
  360. */
  361. index: (session, cb) => {
  362. async.waterfall(
  363. [
  364. (next) => {
  365. cache
  366. .runJob("HGETALL", { table: "stations" })
  367. .then((stations) => {
  368. next(null, stations);
  369. });
  370. },
  371. (stations, next) => {
  372. let resultStations = [];
  373. for (let id in stations) {
  374. resultStations.push(stations[id]);
  375. }
  376. next(null, stations);
  377. },
  378. (stationsArray, next) => {
  379. let resultStations = [];
  380. async.each(
  381. stationsArray,
  382. (station, next) => {
  383. async.waterfall(
  384. [
  385. (next) => {
  386. stations
  387. .runJob("CAN_USER_VIEW_STATION", {
  388. station,
  389. userId: session.userId,
  390. hideUnlisted: true
  391. })
  392. .then((exists) => {
  393. next(null, exists);
  394. })
  395. .catch(next);
  396. },
  397. ],
  398. (err, exists) => {
  399. if (err) console.log(err);
  400. station.userCount =
  401. usersPerStationCount[station._id] || 0;
  402. if (exists) resultStations.push(station);
  403. next();
  404. }
  405. );
  406. },
  407. () => {
  408. next(null, resultStations);
  409. }
  410. );
  411. },
  412. ],
  413. async (err, stations) => {
  414. if (err) {
  415. err = await utils.runJob("GET_ERROR", { error: err });
  416. console.log(
  417. "ERROR",
  418. "STATIONS_INDEX",
  419. `Indexing stations failed. "${err}"`
  420. );
  421. return cb({ status: "failure", message: err });
  422. }
  423. console.log(
  424. "SUCCESS",
  425. "STATIONS_INDEX",
  426. `Indexing stations successful.`,
  427. false
  428. );
  429. return cb({ status: "success", stations: stations });
  430. }
  431. );
  432. },
  433. /**
  434. * Obtains basic metadata of a station in order to format an activity
  435. *
  436. * @param session
  437. * @param stationId - the station id
  438. * @param cb
  439. */
  440. getStationForActivity: (session, stationId, cb) => {
  441. async.waterfall(
  442. [
  443. (next) => {
  444. stations
  445. .runJob("GET_STATION", { stationId })
  446. .then((station) => next(null, station))
  447. .catch(next);
  448. },
  449. ],
  450. async (err, station) => {
  451. if (err) {
  452. err = await utils.runJob("GET_ERROR", { error: err });
  453. console.log(
  454. "ERROR",
  455. "STATIONS_GET_STATION_FOR_ACTIVITY",
  456. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  457. );
  458. return cb({ status: "failure", message: err });
  459. } else {
  460. console.log(
  461. "SUCCESS",
  462. "STATIONS_GET_STATION_FOR_ACTIVITY",
  463. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  464. );
  465. return cb({
  466. status: "success",
  467. data: {
  468. title: station.displayName,
  469. thumbnail: station.currentSong
  470. ? station.currentSong.thumbnail
  471. : "",
  472. },
  473. });
  474. }
  475. }
  476. );
  477. },
  478. /**
  479. * Verifies that a station exists
  480. *
  481. * @param session
  482. * @param stationName - the station name
  483. * @param cb
  484. */
  485. existsByName: (session, stationName, cb) => {
  486. async.waterfall(
  487. [
  488. (next) => {
  489. stations
  490. .runJob("GET_STATION_BY_NAME", { stationName })
  491. .then((station) => next(null, station))
  492. .catch(next);
  493. },
  494. (station, next) => {
  495. if (!station) return next(null, false);
  496. stations
  497. .runJob("CAN_USER_VIEW_STATION", {
  498. station,
  499. userId: session.userId,
  500. })
  501. .then((exists) => next(null, exists))
  502. .catch(next);
  503. },
  504. ],
  505. async (err, exists) => {
  506. if (err) {
  507. err = await utils.runJob("GET_ERROR", { error: err });
  508. console.log(
  509. "ERROR",
  510. "STATION_EXISTS_BY_NAME",
  511. `Checking if station "${stationName}" exists failed. "${err}"`
  512. );
  513. return cb({ status: "failure", message: err });
  514. }
  515. console.log(
  516. "SUCCESS",
  517. "STATION_EXISTS_BY_NAME",
  518. `Station "${stationName}" exists successfully.` /*, false*/
  519. );
  520. cb({ status: "success", exists });
  521. }
  522. );
  523. },
  524. /**
  525. * Gets the official playlist for a station
  526. *
  527. * @param session
  528. * @param stationId - the station id
  529. * @param cb
  530. */
  531. getPlaylist: (session, stationId, cb) => {
  532. async.waterfall(
  533. [
  534. (next) => {
  535. stations
  536. .runJob("GET_STATION", { stationId })
  537. .then((station) => next(null, station))
  538. .catch(next);
  539. },
  540. (station, next) => {
  541. stations
  542. .runJob("CAN_USER_VIEW_STATION", {
  543. station,
  544. userId: session.userId,
  545. })
  546. .then((canView) => {
  547. if (canView) return next(null, station);
  548. return next("Insufficient permissions.");
  549. })
  550. .catch((err) => {
  551. return next(err);
  552. });
  553. },
  554. (station, next) => {
  555. if (!station) return next("Station not found.");
  556. else if (station.type !== "official")
  557. return next("This is not an official station.");
  558. else next();
  559. },
  560. (next) => {
  561. cache
  562. .runJob("HGET", {
  563. table: "officialPlaylists",
  564. key: stationId,
  565. })
  566. .then((playlist) => next(null, playlist))
  567. .catch(next);
  568. },
  569. (playlist, next) => {
  570. if (!playlist) return next("Playlist not found.");
  571. next(null, playlist);
  572. },
  573. ],
  574. async (err, playlist) => {
  575. if (err) {
  576. err = await utils.runJob("GET_ERROR", { error: err });
  577. console.log(
  578. "ERROR",
  579. "STATIONS_GET_PLAYLIST",
  580. `Getting playlist for station "${stationId}" failed. "${err}"`
  581. );
  582. return cb({ status: "failure", message: err });
  583. } else {
  584. console.log(
  585. "SUCCESS",
  586. "STATIONS_GET_PLAYLIST",
  587. `Got playlist for station "${stationId}" successfully.`,
  588. false
  589. );
  590. cb({ status: "success", data: playlist.songs });
  591. }
  592. }
  593. );
  594. },
  595. /**
  596. * Joins the station by its name
  597. *
  598. * @param session
  599. * @param stationName - the station name
  600. * @param cb
  601. * @return {{ status: String, userCount: Integer }}
  602. */
  603. join: (session, stationName, cb) => {
  604. async.waterfall(
  605. [
  606. (next) => {
  607. stations
  608. .runJob("GET_STATION_BY_NAME", { stationName })
  609. .then((station) => next(null, station))
  610. .catch(next);
  611. },
  612. (station, next) => {
  613. if (!station) return next("Station not found.");
  614. stations
  615. .runJob("CAN_USER_VIEW_STATION", {
  616. station,
  617. userId: session.userId,
  618. })
  619. .then((canView) => {
  620. if (!canView) next("Not allowed to join station.");
  621. else next(null, station);
  622. })
  623. .catch((err) => {
  624. return next(err);
  625. });
  626. },
  627. (station, next) => {
  628. utils.runJob("SOCKET_JOIN_ROOM", {
  629. socketId: session.socketId,
  630. room: `station.${station._id}`,
  631. });
  632. let data = {
  633. _id: station._id,
  634. type: station.type,
  635. currentSong: station.currentSong,
  636. startedAt: station.startedAt,
  637. paused: station.paused,
  638. timePaused: station.timePaused,
  639. pausedAt: station.pausedAt,
  640. description: station.description,
  641. displayName: station.displayName,
  642. privacy: station.privacy,
  643. locked: station.locked,
  644. partyMode: station.partyMode,
  645. owner: station.owner,
  646. privatePlaylist: station.privatePlaylist,
  647. };
  648. userList[session.socketId] = station._id;
  649. next(null, data);
  650. },
  651. (data, next) => {
  652. data = JSON.parse(JSON.stringify(data));
  653. data.userCount = usersPerStationCount[data._id] || 0;
  654. data.users = usersPerStation[data._id] || [];
  655. if (!data.currentSong || !data.currentSong.title)
  656. return next(null, data);
  657. utils.runJob("SOCKET_JOIN_SONG_ROOM", {
  658. socketId: session.socketId,
  659. room: `song.${data.currentSong.songId}`,
  660. });
  661. data.currentSong.skipVotes =
  662. data.currentSong.skipVotes.length;
  663. songs
  664. .runJob("GET_SONG_FROM_ID", {
  665. songId: data.currentSong.songId,
  666. })
  667. .then((response) => {
  668. const song = response.song;
  669. if (song) {
  670. data.currentSong.likes = song.likes;
  671. data.currentSong.dislikes = song.dislikes;
  672. } else {
  673. data.currentSong.likes = -1;
  674. data.currentSong.dislikes = -1;
  675. }
  676. })
  677. .catch((err) => {
  678. data.currentSong.likes = -1;
  679. data.currentSong.dislikes = -1;
  680. })
  681. .finally(() => {
  682. next(null, data);
  683. });
  684. },
  685. ],
  686. async (err, data) => {
  687. if (err) {
  688. err = await utils.runJob("GET_ERROR", { error: err });
  689. console.log(
  690. "ERROR",
  691. "STATIONS_JOIN",
  692. `Joining station "${stationName}" failed. "${err}"`
  693. );
  694. return cb({ status: "failure", message: err });
  695. }
  696. console.log(
  697. "SUCCESS",
  698. "STATIONS_JOIN",
  699. `Joined station "${data._id}" successfully.`
  700. );
  701. cb({ status: "success", data });
  702. }
  703. );
  704. },
  705. /**
  706. * Toggles if a station is locked
  707. *
  708. * @param session
  709. * @param stationId - the station id
  710. * @param cb
  711. */
  712. toggleLock: hooks.ownerRequired(async (session, stationId, cb) => {
  713. const stationModel = await db.runJob("GET_MODEL", {
  714. modelName: "station",
  715. });
  716. async.waterfall(
  717. [
  718. (next) => {
  719. stations
  720. .runJob("GET_STATION", { stationId })
  721. .then((station) => next(null, station))
  722. .catch(next);
  723. },
  724. (station, next) => {
  725. stationModel.updateOne(
  726. { _id: stationId },
  727. { $set: { locked: !station.locked } },
  728. next
  729. );
  730. },
  731. (res, next) => {
  732. stations
  733. .runJob("UPDATE_STATION", { stationId })
  734. .then((station) => next(null, station))
  735. .catch(next);
  736. },
  737. ],
  738. async (err, station) => {
  739. if (err) {
  740. err = await utils.runJob("GET_ERROR", { error: err });
  741. console.log(
  742. "ERROR",
  743. "STATIONS_UPDATE_LOCKED_STATUS",
  744. `Toggling the queue lock for station "${stationId}" failed. "${err}"`
  745. );
  746. return cb({ status: "failure", message: err });
  747. } else {
  748. console.log(
  749. "SUCCESS",
  750. "STATIONS_UPDATE_LOCKED_STATUS",
  751. `Toggled the queue lock for station "${stationId}" successfully to "${station.locked}".`
  752. );
  753. cache.runJob("PUB", {
  754. channel: "station.queueLockToggled",
  755. value: {
  756. stationId,
  757. locked: station.locked,
  758. },
  759. });
  760. return cb({ status: "success", data: station.locked });
  761. }
  762. }
  763. );
  764. }),
  765. /**
  766. * Votes to skip a station
  767. *
  768. * @param session
  769. * @param stationId - the station id
  770. * @param cb
  771. */
  772. voteSkip: hooks.loginRequired(async (session, stationId, cb) => {
  773. const stationModel = await db.runJob("GET_MODEL", {
  774. modelName: "station",
  775. });
  776. let skipVotes = 0;
  777. let shouldSkip = false;
  778. async.waterfall(
  779. [
  780. (next) => {
  781. stations
  782. .runJob("GET_STATION", { stationId })
  783. .then((station) => next(null, station))
  784. .catch(next);
  785. },
  786. (station, next) => {
  787. if (!station) return next("Station not found.");
  788. stations
  789. .runJob("CAN_USER_VIEW_STATION", {
  790. station,
  791. userId: session.userId,
  792. })
  793. .then((canView) => {
  794. if (canView) return next(null, station);
  795. return next("Insufficient permissions.");
  796. })
  797. .catch((err) => {
  798. return next(err);
  799. });
  800. },
  801. (station, next) => {
  802. if (!station.currentSong)
  803. return next("There is currently no song to skip.");
  804. if (
  805. station.currentSong.skipVotes.indexOf(
  806. session.userId
  807. ) !== -1
  808. )
  809. return next(
  810. "You have already voted to skip this song."
  811. );
  812. next(null, station);
  813. },
  814. (station, next) => {
  815. stationModel.updateOne(
  816. { _id: stationId },
  817. { $push: { "currentSong.skipVotes": session.userId } },
  818. next
  819. );
  820. },
  821. (res, next) => {
  822. stations
  823. .runJob("UPDATE_STATION", { stationId })
  824. .then((station) => next(null, station))
  825. .catch(next);
  826. },
  827. (station, next) => {
  828. if (!station) return next("Station not found.");
  829. next(null, station);
  830. },
  831. (station, next) => {
  832. skipVotes = station.currentSong.skipVotes.length;
  833. utils
  834. .runJob("GET_ROOM_SOCKETS", {
  835. room: `station.${stationId}`,
  836. })
  837. .then((sockets) => next(null, sockets))
  838. .catch(next);
  839. },
  840. (sockets, next) => {
  841. if (sockets.length <= skipVotes) shouldSkip = true;
  842. next();
  843. },
  844. ],
  845. async (err, station) => {
  846. if (err) {
  847. err = await utils.runJob("GET_ERROR", { error: err });
  848. console.log(
  849. "ERROR",
  850. "STATIONS_VOTE_SKIP",
  851. `Vote skipping station "${stationId}" failed. "${err}"`
  852. );
  853. return cb({ status: "failure", message: err });
  854. }
  855. console.log(
  856. "SUCCESS",
  857. "STATIONS_VOTE_SKIP",
  858. `Vote skipping "${stationId}" successful.`
  859. );
  860. cache.runJob("PUB", {
  861. channel: "station.voteSkipSong",
  862. value: stationId,
  863. });
  864. cb({
  865. status: "success",
  866. message: "Successfully voted to skip the song.",
  867. });
  868. if (shouldSkip) stations.runJob("SKIP_STATION", { stationId });
  869. }
  870. );
  871. }),
  872. /**
  873. * Force skips a station
  874. *
  875. * @param session
  876. * @param stationId - the station id
  877. * @param cb
  878. */
  879. forceSkip: hooks.ownerRequired((session, stationId, cb) => {
  880. async.waterfall(
  881. [
  882. (next) => {
  883. stations
  884. .runJob("GET_STATION", { stationId })
  885. .then((station) => next(null, station))
  886. .catch(next);
  887. },
  888. (station, next) => {
  889. if (!station) return next("Station not found.");
  890. next();
  891. },
  892. ],
  893. async (err) => {
  894. if (err) {
  895. err = await utils.runJob("GET_ERROR", { error: err });
  896. console.log(
  897. "ERROR",
  898. "STATIONS_FORCE_SKIP",
  899. `Force skipping station "${stationId}" failed. "${err}"`
  900. );
  901. return cb({ status: "failure", message: err });
  902. }
  903. notifications.runJob("UNSCHEDULE", {
  904. name: `stations.nextSong?id=${stationId}`,
  905. });
  906. stations.runJob("SKIP_STATION", { stationId });
  907. console.log(
  908. "SUCCESS",
  909. "STATIONS_FORCE_SKIP",
  910. `Force skipped station "${stationId}" successfully.`
  911. );
  912. return cb({
  913. status: "success",
  914. message: "Successfully skipped station.",
  915. });
  916. }
  917. );
  918. }),
  919. /**
  920. * Leaves the user's current station
  921. *
  922. * @param session
  923. * @param stationId
  924. * @param cb
  925. * @return {{ status: String, userCount: Integer }}
  926. */
  927. leave: (session, stationId, cb) => {
  928. async.waterfall(
  929. [
  930. (next) => {
  931. stations
  932. .runJob("GET_STATION", { stationId })
  933. .then((station) => next(null, station))
  934. .catch(next);
  935. },
  936. (station, next) => {
  937. if (!station) return next("Station not found.");
  938. next();
  939. },
  940. ],
  941. async (err, userCount) => {
  942. if (err) {
  943. err = await utils.runJob("GET_ERROR", { error: err });
  944. console.log(
  945. "ERROR",
  946. "STATIONS_LEAVE",
  947. `Leaving station "${stationId}" failed. "${err}"`
  948. );
  949. return cb({ status: "failure", message: err });
  950. }
  951. console.log(
  952. "SUCCESS",
  953. "STATIONS_LEAVE",
  954. `Left station "${stationId}" successfully.`
  955. );
  956. utils.runJob("SOCKET_LEAVE_ROOMS", { socketId: session });
  957. delete userList[session.socketId];
  958. return cb({
  959. status: "success",
  960. message: "Successfully left station.",
  961. userCount,
  962. });
  963. }
  964. );
  965. },
  966. /**
  967. * Updates a station's name
  968. *
  969. * @param session
  970. * @param stationId - the station id
  971. * @param newName - the new station name
  972. * @param cb
  973. */
  974. updateName: hooks.ownerRequired(async (session, stationId, newName, cb) => {
  975. const stationModel = await db.runJob("GET_MODEL", {
  976. modelName: "station",
  977. });
  978. async.waterfall(
  979. [
  980. (next) => {
  981. stationModel.updateOne(
  982. { _id: stationId },
  983. { $set: { name: newName } },
  984. { runValidators: true },
  985. next
  986. );
  987. },
  988. (res, next) => {
  989. stations
  990. .runJob("UPDATE_STATION", { stationId })
  991. .then((station) => next(null, station))
  992. .catch(next);
  993. },
  994. ],
  995. async (err) => {
  996. if (err) {
  997. err = await utils.runJob("GET_ERROR", { error: err });
  998. console.log(
  999. "ERROR",
  1000. "STATIONS_UPDATE_NAME",
  1001. `Updating station "${stationId}" name to "${newName}" failed. "${err}"`
  1002. );
  1003. return cb({ status: "failure", message: err });
  1004. }
  1005. console.log(
  1006. "SUCCESS",
  1007. "STATIONS_UPDATE_NAME",
  1008. `Updated station "${stationId}" name to "${newName}" successfully.`
  1009. );
  1010. return cb({
  1011. status: "success",
  1012. message: "Successfully updated the name.",
  1013. });
  1014. }
  1015. );
  1016. }),
  1017. /**
  1018. * Updates a station's display name
  1019. *
  1020. * @param session
  1021. * @param stationId - the station id
  1022. * @param newDisplayName - the new station display name
  1023. * @param cb
  1024. */
  1025. updateDisplayName: hooks.ownerRequired(
  1026. async (session, stationId, newDisplayName, cb) => {
  1027. const stationModel = await db.runJob("GET_MODEL", {
  1028. modelName: "station",
  1029. });
  1030. async.waterfall(
  1031. [
  1032. (next) => {
  1033. stationModel.updateOne(
  1034. { _id: stationId },
  1035. { $set: { displayName: newDisplayName } },
  1036. { runValidators: true },
  1037. next
  1038. );
  1039. },
  1040. (res, next) => {
  1041. stations
  1042. .runJob("UPDATE_STATION", { stationId })
  1043. .then((station) => next(null, station))
  1044. .catch(next);
  1045. },
  1046. ],
  1047. async (err) => {
  1048. if (err) {
  1049. err = await utils.runJob("GET_ERROR", { error: err });
  1050. console.log(
  1051. "ERROR",
  1052. "STATIONS_UPDATE_DISPLAY_NAME",
  1053. `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`
  1054. );
  1055. return cb({ status: "failure", message: err });
  1056. }
  1057. console.log(
  1058. "SUCCESS",
  1059. "STATIONS_UPDATE_DISPLAY_NAME",
  1060. `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`
  1061. );
  1062. return cb({
  1063. status: "success",
  1064. message: "Successfully updated the display name.",
  1065. });
  1066. }
  1067. );
  1068. }
  1069. ),
  1070. /**
  1071. * Updates a station's description
  1072. *
  1073. * @param session
  1074. * @param stationId - the station id
  1075. * @param newDescription - the new station description
  1076. * @param cb
  1077. */
  1078. updateDescription: hooks.ownerRequired(
  1079. async (session, stationId, newDescription, cb) => {
  1080. const stationModel = await db.runJob("GET_MODEL", {
  1081. modelName: "station",
  1082. });
  1083. async.waterfall(
  1084. [
  1085. (next) => {
  1086. stationModel.updateOne(
  1087. { _id: stationId },
  1088. { $set: { description: newDescription } },
  1089. { runValidators: true },
  1090. next
  1091. );
  1092. },
  1093. (res, next) => {
  1094. stations
  1095. .runJob("UPDATE_STATION", { stationId })
  1096. .then((station) => next(null, station))
  1097. .catch(next);
  1098. },
  1099. ],
  1100. async (err) => {
  1101. if (err) {
  1102. err = await utils.runJob("GET_ERROR", { error: err });
  1103. console.log(
  1104. "ERROR",
  1105. "STATIONS_UPDATE_DESCRIPTION",
  1106. `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`
  1107. );
  1108. return cb({ status: "failure", message: err });
  1109. }
  1110. console.log(
  1111. "SUCCESS",
  1112. "STATIONS_UPDATE_DESCRIPTION",
  1113. `Updated station "${stationId}" description to "${newDescription}" successfully.`
  1114. );
  1115. return cb({
  1116. status: "success",
  1117. message: "Successfully updated the description.",
  1118. });
  1119. }
  1120. );
  1121. }
  1122. ),
  1123. /**
  1124. * Updates a station's privacy
  1125. *
  1126. * @param session
  1127. * @param stationId - the station id
  1128. * @param newPrivacy - the new station privacy
  1129. * @param cb
  1130. */
  1131. updatePrivacy: hooks.ownerRequired(
  1132. async (session, stationId, newPrivacy, cb) => {
  1133. const stationModel = await db.runJob("GET_MODEL", {
  1134. modelName: "station",
  1135. });
  1136. async.waterfall(
  1137. [
  1138. (next) => {
  1139. stationModel.updateOne(
  1140. { _id: stationId },
  1141. { $set: { privacy: newPrivacy } },
  1142. { runValidators: true },
  1143. next
  1144. );
  1145. },
  1146. (res, next) => {
  1147. stations
  1148. .runJob("UPDATE_STATION", { stationId })
  1149. .then((station) => next(null, station))
  1150. .catch(next);
  1151. },
  1152. ],
  1153. async (err) => {
  1154. if (err) {
  1155. err = await utils.runJob("GET_ERROR", { error: err });
  1156. console.log(
  1157. "ERROR",
  1158. "STATIONS_UPDATE_PRIVACY",
  1159. `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`
  1160. );
  1161. return cb({ status: "failure", message: err });
  1162. }
  1163. console.log(
  1164. "SUCCESS",
  1165. "STATIONS_UPDATE_PRIVACY",
  1166. `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`
  1167. );
  1168. return cb({
  1169. status: "success",
  1170. message: "Successfully updated the privacy.",
  1171. });
  1172. }
  1173. );
  1174. }
  1175. ),
  1176. /**
  1177. * Updates a station's genres
  1178. *
  1179. * @param session
  1180. * @param stationId - the station id
  1181. * @param newGenres - the new station genres
  1182. * @param cb
  1183. */
  1184. updateGenres: hooks.ownerRequired(
  1185. async (session, stationId, newGenres, cb) => {
  1186. const stationModel = await db.runJob("GET_MODEL", {
  1187. modelName: "station",
  1188. });
  1189. async.waterfall(
  1190. [
  1191. (next) => {
  1192. stationModel.updateOne(
  1193. { _id: stationId },
  1194. { $set: { genres: newGenres } },
  1195. { runValidators: true },
  1196. next
  1197. );
  1198. },
  1199. (res, next) => {
  1200. stations
  1201. .runJob("UPDATE_STATION", { stationId })
  1202. .then((station) => next(null, station))
  1203. .catch(next);
  1204. },
  1205. ],
  1206. async (err) => {
  1207. if (err) {
  1208. err = await utils.runJob("GET_ERROR", { error: err });
  1209. console.log(
  1210. "ERROR",
  1211. "STATIONS_UPDATE_GENRES",
  1212. `Updating station "${stationId}" genres to "${newGenres}" failed. "${err}"`
  1213. );
  1214. return cb({ status: "failure", message: err });
  1215. }
  1216. console.log(
  1217. "SUCCESS",
  1218. "STATIONS_UPDATE_GENRES",
  1219. `Updated station "${stationId}" genres to "${newGenres}" successfully.`
  1220. );
  1221. return cb({
  1222. status: "success",
  1223. message: "Successfully updated the genres.",
  1224. });
  1225. }
  1226. );
  1227. }
  1228. ),
  1229. /**
  1230. * Updates a station's blacklisted genres
  1231. *
  1232. * @param session
  1233. * @param stationId - the station id
  1234. * @param newBlacklistedGenres - the new station blacklisted genres
  1235. * @param cb
  1236. */
  1237. updateBlacklistedGenres: hooks.ownerRequired(
  1238. async (session, stationId, newBlacklistedGenres, cb) => {
  1239. const stationModel = await db.runJob("GET_MODEL", {
  1240. modelName: "station",
  1241. });
  1242. async.waterfall(
  1243. [
  1244. (next) => {
  1245. stationModel.updateOne(
  1246. { _id: stationId },
  1247. {
  1248. $set: {
  1249. blacklistedGenres: newBlacklistedGenres,
  1250. },
  1251. },
  1252. { runValidators: true },
  1253. next
  1254. );
  1255. },
  1256. (res, next) => {
  1257. stations
  1258. .runJob("UPDATE_STATION", { stationId })
  1259. .then((station) => next(null, station))
  1260. .catch(next);
  1261. },
  1262. ],
  1263. async (err) => {
  1264. if (err) {
  1265. err = await utils.runJob("GET_ERROR", { error: err });
  1266. console.log(
  1267. "ERROR",
  1268. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1269. `Updating station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" failed. "${err}"`
  1270. );
  1271. return cb({ status: "failure", message: err });
  1272. }
  1273. console.log(
  1274. "SUCCESS",
  1275. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1276. `Updated station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" successfully.`
  1277. );
  1278. return cb({
  1279. status: "success",
  1280. message: "Successfully updated the blacklisted genres.",
  1281. });
  1282. }
  1283. );
  1284. }
  1285. ),
  1286. /**
  1287. * Updates a station's party mode
  1288. *
  1289. * @param session
  1290. * @param stationId - the station id
  1291. * @param newPartyMode - the new station party mode
  1292. * @param cb
  1293. */
  1294. updatePartyMode: hooks.ownerRequired(
  1295. async (session, stationId, newPartyMode, cb) => {
  1296. const stationModel = await db.runJob("GET_MODEL", {
  1297. modelName: "station",
  1298. });
  1299. async.waterfall(
  1300. [
  1301. (next) => {
  1302. stations
  1303. .runJob("GET_STATION", { stationId })
  1304. .then((station) => next(null, station))
  1305. .catch(next);
  1306. },
  1307. (station, next) => {
  1308. if (!station) return next("Station not found.");
  1309. if (station.partyMode === newPartyMode)
  1310. return next(
  1311. "The party mode was already " +
  1312. (newPartyMode ? "enabled." : "disabled.")
  1313. );
  1314. stationModel.updateOne(
  1315. { _id: stationId },
  1316. { $set: { partyMode: newPartyMode } },
  1317. { runValidators: true },
  1318. next
  1319. );
  1320. },
  1321. (res, next) => {
  1322. stations
  1323. .runJob("UPDATE_STATION", { stationId })
  1324. .then((station) => next(null, station))
  1325. .catch(next);
  1326. },
  1327. ],
  1328. async (err) => {
  1329. if (err) {
  1330. err = await utils.runJob("GET_ERROR", { error: err });
  1331. console.log(
  1332. "ERROR",
  1333. "STATIONS_UPDATE_PARTY_MODE",
  1334. `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`
  1335. );
  1336. return cb({ status: "failure", message: err });
  1337. }
  1338. console.log(
  1339. "SUCCESS",
  1340. "STATIONS_UPDATE_PARTY_MODE",
  1341. `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`
  1342. );
  1343. cache.runJob("PUB", {
  1344. channel: "station.updatePartyMode",
  1345. value: {
  1346. stationId: stationId,
  1347. partyMode: newPartyMode,
  1348. },
  1349. });
  1350. stations.runJob("SKIP_STATION", { stationId });
  1351. return cb({
  1352. status: "success",
  1353. message: "Successfully updated the party mode.",
  1354. });
  1355. }
  1356. );
  1357. }
  1358. ),
  1359. /**
  1360. * Pauses a station
  1361. *
  1362. * @param session
  1363. * @param stationId - the station id
  1364. * @param cb
  1365. */
  1366. pause: hooks.ownerRequired(async (session, stationId, cb) => {
  1367. const stationModel = await db.runJob("GET_MODEL", {
  1368. modelName: "station",
  1369. });
  1370. async.waterfall(
  1371. [
  1372. (next) => {
  1373. stations
  1374. .runJob("GET_STATION", { stationId })
  1375. .then((station) => next(null, station))
  1376. .catch(next);
  1377. },
  1378. (station, next) => {
  1379. if (!station) return next("Station not found.");
  1380. if (station.paused)
  1381. return next("That station was already paused.");
  1382. stationModel.updateOne(
  1383. { _id: stationId },
  1384. { $set: { paused: true, pausedAt: Date.now() } },
  1385. next
  1386. );
  1387. },
  1388. (res, next) => {
  1389. stations
  1390. .runJob("UPDATE_STATION", { stationId })
  1391. .then((station) => next(null, station))
  1392. .catch(next);
  1393. },
  1394. ],
  1395. async (err) => {
  1396. if (err) {
  1397. err = await utils.runJob("GET_ERROR", { error: err });
  1398. console.log(
  1399. "ERROR",
  1400. "STATIONS_PAUSE",
  1401. `Pausing station "${stationId}" failed. "${err}"`
  1402. );
  1403. return cb({ status: "failure", message: err });
  1404. }
  1405. console.log(
  1406. "SUCCESS",
  1407. "STATIONS_PAUSE",
  1408. `Paused station "${stationId}" successfully.`
  1409. );
  1410. cache.runJob("PUB", {
  1411. channel: "station.pause",
  1412. value: stationId,
  1413. });
  1414. notifications.runJob("UNSCHEDULE", {
  1415. name: `stations.nextSong?id=${stationId}`,
  1416. });
  1417. return cb({
  1418. status: "success",
  1419. message: "Successfully paused.",
  1420. });
  1421. }
  1422. );
  1423. }),
  1424. /**
  1425. * Resumes a station
  1426. *
  1427. * @param session
  1428. * @param stationId - the station id
  1429. * @param cb
  1430. */
  1431. resume: hooks.ownerRequired(async (session, stationId, cb) => {
  1432. const stationModel = await db.runJob("GET_MODEL", {
  1433. modelName: "station",
  1434. });
  1435. async.waterfall(
  1436. [
  1437. (next) => {
  1438. stations
  1439. .runJob("GET_STATION", { stationId })
  1440. .then((station) => next(null, station))
  1441. .catch(next);
  1442. },
  1443. (station, next) => {
  1444. if (!station) return next("Station not found.");
  1445. if (!station.paused)
  1446. return next("That station is not paused.");
  1447. station.timePaused += Date.now() - station.pausedAt;
  1448. stationModel.updateOne(
  1449. { _id: stationId },
  1450. {
  1451. $set: { paused: false },
  1452. $inc: { timePaused: Date.now() - station.pausedAt },
  1453. },
  1454. next
  1455. );
  1456. },
  1457. (res, next) => {
  1458. stations
  1459. .runJob("UPDATE_STATION", { stationId })
  1460. .then((station) => next(null, station))
  1461. .catch(next);
  1462. },
  1463. ],
  1464. async (err) => {
  1465. if (err) {
  1466. err = await utils.runJob("GET_ERROR", { error: err });
  1467. console.log(
  1468. "ERROR",
  1469. "STATIONS_RESUME",
  1470. `Resuming station "${stationId}" failed. "${err}"`
  1471. );
  1472. return cb({ status: "failure", message: err });
  1473. }
  1474. console.log(
  1475. "SUCCESS",
  1476. "STATIONS_RESUME",
  1477. `Resuming station "${stationId}" successfully.`
  1478. );
  1479. cache.runJob("PUB", {
  1480. channel: "station.resume",
  1481. value: stationId,
  1482. });
  1483. return cb({
  1484. status: "success",
  1485. message: "Successfully resumed.",
  1486. });
  1487. }
  1488. );
  1489. }),
  1490. /**
  1491. * Removes a station
  1492. *
  1493. * @param session
  1494. * @param stationId - the station id
  1495. * @param cb
  1496. */
  1497. remove: hooks.ownerRequired(async (session, stationId, cb) => {
  1498. const stationModel = await db.runJob("GET_MODEL", {
  1499. modelName: "station",
  1500. });
  1501. async.waterfall(
  1502. [
  1503. (next) => {
  1504. stationModel.deleteOne({ _id: stationId }, (err) =>
  1505. next(err)
  1506. );
  1507. },
  1508. (next) => {
  1509. cache
  1510. .runJob("HDEL", { table: "stations", key: stationId })
  1511. .then(next)
  1512. .catch(next);
  1513. },
  1514. ],
  1515. async (err) => {
  1516. if (err) {
  1517. err = await utils.runJob("GET_ERROR", { error: err });
  1518. console.log(
  1519. "ERROR",
  1520. "STATIONS_REMOVE",
  1521. `Removing station "${stationId}" failed. "${err}"`
  1522. );
  1523. return cb({ status: "failure", message: err });
  1524. }
  1525. console.log(
  1526. "SUCCESS",
  1527. "STATIONS_REMOVE",
  1528. `Removing station "${stationId}" successfully.`
  1529. );
  1530. cache.runJob("PUB", {
  1531. channel: "station.remove",
  1532. value: stationId,
  1533. });
  1534. activities.runJob("ADD_ACTIVITY", {
  1535. userId: session.userId,
  1536. activityType: "deleted_station",
  1537. payload: [stationId],
  1538. });
  1539. return cb({
  1540. status: "success",
  1541. message: "Successfully removed.",
  1542. });
  1543. }
  1544. );
  1545. }),
  1546. /**
  1547. * Create a station
  1548. *
  1549. * @param session
  1550. * @param data - the station data
  1551. * @param cb
  1552. */
  1553. create: hooks.loginRequired(async (session, data, cb) => {
  1554. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1555. const stationModel = await db.runJob("GET_MODEL", {
  1556. modelName: "station",
  1557. });
  1558. data.name = data.name.toLowerCase();
  1559. let blacklist = [
  1560. "country",
  1561. "edm",
  1562. "musare",
  1563. "hip-hop",
  1564. "rap",
  1565. "top-hits",
  1566. "todays-hits",
  1567. "old-school",
  1568. "christmas",
  1569. "about",
  1570. "support",
  1571. "staff",
  1572. "help",
  1573. "news",
  1574. "terms",
  1575. "privacy",
  1576. "profile",
  1577. "c",
  1578. "community",
  1579. "tos",
  1580. "login",
  1581. "register",
  1582. "p",
  1583. "official",
  1584. "o",
  1585. "trap",
  1586. "faq",
  1587. "team",
  1588. "donate",
  1589. "buy",
  1590. "shop",
  1591. "forums",
  1592. "explore",
  1593. "settings",
  1594. "admin",
  1595. "auth",
  1596. "reset_password",
  1597. ];
  1598. async.waterfall(
  1599. [
  1600. (next) => {
  1601. if (!data) return next("Invalid data.");
  1602. next();
  1603. },
  1604. (next) => {
  1605. stationModel.findOne(
  1606. {
  1607. $or: [
  1608. { name: data.name },
  1609. {
  1610. displayName: new RegExp(
  1611. `^${data.displayName}$`,
  1612. "i"
  1613. ),
  1614. },
  1615. ],
  1616. },
  1617. next
  1618. );
  1619. },
  1620. (station, next) => {
  1621. if (station)
  1622. return next(
  1623. "A station with that name or display name already exists."
  1624. );
  1625. const {
  1626. name,
  1627. displayName,
  1628. description,
  1629. genres,
  1630. playlist,
  1631. type,
  1632. blacklistedGenres,
  1633. } = data;
  1634. if (type === "official") {
  1635. userModel.findOne(
  1636. { _id: session.userId },
  1637. (err, user) => {
  1638. if (err) return next(err);
  1639. if (!user) return next("User not found.");
  1640. if (user.role !== "admin")
  1641. return next("Admin required.");
  1642. stationModel.create(
  1643. {
  1644. name,
  1645. displayName,
  1646. description,
  1647. type,
  1648. privacy: "private",
  1649. playlist,
  1650. genres,
  1651. blacklistedGenres,
  1652. currentSong: stations.defaultSong,
  1653. },
  1654. next
  1655. );
  1656. }
  1657. );
  1658. } else if (type === "community") {
  1659. if (blacklist.indexOf(name) !== -1)
  1660. return next(
  1661. "That name is blacklisted. Please use a different name."
  1662. );
  1663. stationModel.create(
  1664. {
  1665. name,
  1666. displayName,
  1667. description,
  1668. type,
  1669. privacy: "private",
  1670. owner: session.userId,
  1671. queue: [],
  1672. currentSong: null,
  1673. },
  1674. next
  1675. );
  1676. }
  1677. },
  1678. ],
  1679. async (err, station) => {
  1680. if (err) {
  1681. err = await utils.runJob("GET_ERROR", { error: err });
  1682. console.log(
  1683. "ERROR",
  1684. "STATIONS_CREATE",
  1685. `Creating station failed. "${err}"`
  1686. );
  1687. return cb({ status: "failure", message: err });
  1688. }
  1689. console.log(
  1690. "SUCCESS",
  1691. "STATIONS_CREATE",
  1692. `Created station "${station._id}" successfully.`
  1693. );
  1694. cache.runJob("PUB", {
  1695. channel: "station.create",
  1696. value: station._id,
  1697. });
  1698. activities.runJob("ADD_ACTIVITY", {
  1699. userId: session.userId,
  1700. activityType: "created_station",
  1701. payload: [station._id],
  1702. });
  1703. return cb({
  1704. status: "success",
  1705. message: "Successfully created station.",
  1706. });
  1707. }
  1708. );
  1709. }),
  1710. /**
  1711. * Adds song to station queue
  1712. *
  1713. * @param session
  1714. * @param stationId - the station id
  1715. * @param songId - the song id
  1716. * @param cb
  1717. */
  1718. addToQueue: hooks.loginRequired(async (session, stationId, songId, cb) => {
  1719. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1720. const stationModel = await db.runJob("GET_MODEL", {
  1721. modelName: "station",
  1722. });
  1723. async.waterfall(
  1724. [
  1725. (next) => {
  1726. stations
  1727. .runJob("GET_STATION", { stationId })
  1728. .then((station) => next(null, station))
  1729. .catch(next);
  1730. },
  1731. (station, next) => {
  1732. if (!station) return next("Station not found.");
  1733. if (station.locked) {
  1734. userModel.findOne(
  1735. { _id: session.userId },
  1736. (err, user) => {
  1737. if (
  1738. user.role !== "admin" &&
  1739. station.owner !== session.userId
  1740. )
  1741. return next(
  1742. "Only owners and admins can add songs to a locked queue."
  1743. );
  1744. else return next(null, station);
  1745. }
  1746. );
  1747. } else {
  1748. return next(null, station);
  1749. }
  1750. },
  1751. (station, next) => {
  1752. if (station.type !== "community")
  1753. return next("That station is not a community station.");
  1754. stations
  1755. .runJob("CAN_USER_VIEW_STATION", {
  1756. station,
  1757. userId: session.userId,
  1758. })
  1759. .then((canView) => {
  1760. if (canView) return next(null, station);
  1761. return next("Insufficient permissions.");
  1762. })
  1763. .catch((err) => {
  1764. return next(err);
  1765. });
  1766. },
  1767. (station, next) => {
  1768. if (
  1769. station.currentSong &&
  1770. station.currentSong.songId === songId
  1771. )
  1772. return next("That song is currently playing.");
  1773. async.each(
  1774. station.queue,
  1775. (queueSong, next) => {
  1776. if (queueSong.songId === songId)
  1777. return next(
  1778. "That song is already in the queue."
  1779. );
  1780. next();
  1781. },
  1782. (err) => {
  1783. next(err, station);
  1784. }
  1785. );
  1786. },
  1787. (station, next) => {
  1788. // songs
  1789. // .runJob("GET_SONG", { id: songId })
  1790. // .then((song) => {
  1791. // if (song) return next(null, song, station);
  1792. // else {
  1793. utils
  1794. .runJob("GET_SONG_FROM_YOUTUBE", { songId })
  1795. .then((response) => {
  1796. const song = response.song;
  1797. song.artists = [];
  1798. song.skipDuration = 0;
  1799. song.likes = -1;
  1800. song.dislikes = -1;
  1801. song.thumbnail = "empty";
  1802. song.explicit = false;
  1803. next(null, song, station);
  1804. })
  1805. .catch((err) => {
  1806. next(err);
  1807. });
  1808. // }
  1809. // })
  1810. // .catch((err) => {
  1811. // next(err);
  1812. // });
  1813. },
  1814. (song, station, next) => {
  1815. let queue = station.queue;
  1816. song.requestedBy = session.userId;
  1817. queue.push(song);
  1818. let totalDuration = 0;
  1819. queue.forEach((song) => {
  1820. totalDuration += song.duration;
  1821. });
  1822. if (totalDuration >= 3600 * 3)
  1823. return next("The max length of the queue is 3 hours.");
  1824. next(null, song, station);
  1825. },
  1826. (song, station, next) => {
  1827. let queue = station.queue;
  1828. if (queue.length === 0) return next(null, song, station);
  1829. let totalDuration = 0;
  1830. const userId = queue[queue.length - 1].requestedBy;
  1831. station.queue.forEach((song) => {
  1832. if (userId === song.requestedBy) {
  1833. totalDuration += song.duration;
  1834. }
  1835. });
  1836. if (totalDuration >= 900)
  1837. return next(
  1838. "The max length of songs per user is 15 minutes."
  1839. );
  1840. next(null, song, station);
  1841. },
  1842. (song, station, next) => {
  1843. let queue = station.queue;
  1844. if (queue.length === 0) return next(null, song);
  1845. let totalSongs = 0;
  1846. const userId = queue[queue.length - 1].requestedBy;
  1847. queue.forEach((song) => {
  1848. if (userId === song.requestedBy) {
  1849. totalSongs++;
  1850. }
  1851. });
  1852. if (totalSongs <= 2) return next(null, song);
  1853. if (totalSongs > 3)
  1854. return next(
  1855. "The max amount of songs per user is 3, and only 2 in a row is allowed."
  1856. );
  1857. if (
  1858. queue[queue.length - 2].requestedBy !== userId ||
  1859. queue[queue.length - 3] !== userId
  1860. )
  1861. return next(
  1862. "The max amount of songs per user is 3, and only 2 in a row is allowed."
  1863. );
  1864. next(null, song);
  1865. },
  1866. (song, next) => {
  1867. stationModel.updateOne(
  1868. { _id: stationId },
  1869. { $push: { queue: song } },
  1870. { runValidators: true },
  1871. next
  1872. );
  1873. },
  1874. (res, next) => {
  1875. stations
  1876. .runJob("UPDATE_STATION", { stationId })
  1877. .then((station) => next(null, station))
  1878. .catch(next);
  1879. },
  1880. ],
  1881. async (err, station) => {
  1882. if (err) {
  1883. err = await utils.runJob("GET_ERROR", { error: err });
  1884. console.log(
  1885. "ERROR",
  1886. "STATIONS_ADD_SONG_TO_QUEUE",
  1887. `Adding song "${songId}" to station "${stationId}" queue failed. "${err}"`
  1888. );
  1889. return cb({ status: "failure", message: err });
  1890. }
  1891. console.log(
  1892. "SUCCESS",
  1893. "STATIONS_ADD_SONG_TO_QUEUE",
  1894. `Added song "${songId}" to station "${stationId}" successfully.`
  1895. );
  1896. cache.runJob("PUB", {
  1897. channel: "station.queueUpdate",
  1898. value: stationId,
  1899. });
  1900. return cb({
  1901. status: "success",
  1902. message: "Successfully added song to queue.",
  1903. });
  1904. }
  1905. );
  1906. }),
  1907. /**
  1908. * Removes song from station queue
  1909. *
  1910. * @param session
  1911. * @param stationId - the station id
  1912. * @param songId - the song id
  1913. * @param cb
  1914. */
  1915. removeFromQueue: hooks.ownerRequired(
  1916. async (session, stationId, songId, cb) => {
  1917. const stationModel = await db.runJob("GET_MODEL", {
  1918. modelName: "station",
  1919. });
  1920. async.waterfall(
  1921. [
  1922. (next) => {
  1923. if (!songId) return next("Invalid song id.");
  1924. stations
  1925. .runJob("GET_STATION", { stationId })
  1926. .then((station) => next(null, station))
  1927. .catch(next);
  1928. },
  1929. (station, next) => {
  1930. if (!station) return next("Station not found.");
  1931. if (station.type !== "community")
  1932. return next("Station is not a community station.");
  1933. async.each(
  1934. station.queue,
  1935. (queueSong, next) => {
  1936. if (queueSong.songId === songId)
  1937. return next(true);
  1938. next();
  1939. },
  1940. (err) => {
  1941. if (err === true) return next();
  1942. next("Song is not currently in the queue.");
  1943. }
  1944. );
  1945. },
  1946. (next) => {
  1947. stationModel.updateOne(
  1948. { _id: stationId },
  1949. { $pull: { queue: { songId: songId } } },
  1950. next
  1951. );
  1952. },
  1953. (res, next) => {
  1954. stations
  1955. .runJob("UPDATE_STATION", { stationId })
  1956. .then((station) => next(null, station))
  1957. .catch(next);
  1958. },
  1959. ],
  1960. async (err, station) => {
  1961. if (err) {
  1962. err = await utils.runJob("GET_ERROR", { error: err });
  1963. console.log(
  1964. "ERROR",
  1965. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1966. `Removing song "${songId}" from station "${stationId}" queue failed. "${err}"`
  1967. );
  1968. return cb({ status: "failure", message: err });
  1969. }
  1970. console.log(
  1971. "SUCCESS",
  1972. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1973. `Removed song "${songId}" from station "${stationId}" successfully.`
  1974. );
  1975. cache.runJob("PUB", {
  1976. channel: "station.queueUpdate",
  1977. value: stationId,
  1978. });
  1979. return cb({
  1980. status: "success",
  1981. message: "Successfully removed song from queue.",
  1982. });
  1983. }
  1984. );
  1985. }
  1986. ),
  1987. /**
  1988. * Gets the queue from a station
  1989. *
  1990. * @param session
  1991. * @param stationId - the station id
  1992. * @param cb
  1993. */
  1994. getQueue: (session, stationId, cb) => {
  1995. async.waterfall(
  1996. [
  1997. (next) => {
  1998. stations
  1999. .runJob("GET_STATION", { stationId })
  2000. .then((station) => next(null, station))
  2001. .catch(next);
  2002. },
  2003. (station, next) => {
  2004. if (!station) return next("Station not found.");
  2005. if (station.type !== "community")
  2006. return next("Station is not a community station.");
  2007. next(null, station);
  2008. },
  2009. (station, next) => {
  2010. stations
  2011. .runJob("CAN_USER_VIEW_STATION", {
  2012. station,
  2013. userId: session.userId,
  2014. })
  2015. .then((canView) => {
  2016. if (canView) return next(null, station);
  2017. return next("Insufficient permissions.");
  2018. })
  2019. .catch((err) => {
  2020. return next(err);
  2021. });
  2022. },
  2023. ],
  2024. async (err, station) => {
  2025. if (err) {
  2026. err = await utils.runJob("GET_ERROR", { error: err });
  2027. console.log(
  2028. "ERROR",
  2029. "STATIONS_GET_QUEUE",
  2030. `Getting queue for station "${stationId}" failed. "${err}"`
  2031. );
  2032. return cb({ status: "failure", message: err });
  2033. }
  2034. console.log(
  2035. "SUCCESS",
  2036. "STATIONS_GET_QUEUE",
  2037. `Got queue for station "${stationId}" successfully.`
  2038. );
  2039. return cb({
  2040. status: "success",
  2041. message: "Successfully got queue.",
  2042. queue: station.queue,
  2043. });
  2044. }
  2045. );
  2046. },
  2047. /**
  2048. * Selects a private playlist for a station
  2049. *
  2050. * @param session
  2051. * @param stationId - the station id
  2052. * @param playlistId - the private playlist id
  2053. * @param cb
  2054. */
  2055. selectPrivatePlaylist: hooks.ownerRequired(
  2056. async (session, stationId, playlistId, cb) => {
  2057. const stationModel = await db.runJob("GET_MODEL", {
  2058. modelName: "station",
  2059. });
  2060. const playlistModel = await db.runJob("GET_MODEL", {
  2061. modelName: "playlist",
  2062. });
  2063. async.waterfall(
  2064. [
  2065. (next) => {
  2066. stations
  2067. .runJob("GET_STATION", { stationId })
  2068. .then((station) => next(null, station))
  2069. .catch(next);
  2070. },
  2071. (station, next) => {
  2072. if (!station) return next("Station not found.");
  2073. if (station.type !== "community")
  2074. return next("Station is not a community station.");
  2075. if (station.privatePlaylist === playlistId)
  2076. return next(
  2077. "That private playlist is already selected."
  2078. );
  2079. playlistModel.findOne({ _id: playlistId }, next);
  2080. },
  2081. (playlist, next) => {
  2082. if (!playlist) return next("Playlist not found.");
  2083. let currentSongIndex =
  2084. playlist.songs.length > 0
  2085. ? playlist.songs.length - 1
  2086. : 0;
  2087. stationModel.updateOne(
  2088. { _id: stationId },
  2089. {
  2090. $set: {
  2091. privatePlaylist: playlistId,
  2092. currentSongIndex: currentSongIndex,
  2093. },
  2094. },
  2095. { runValidators: true },
  2096. next
  2097. );
  2098. },
  2099. (res, next) => {
  2100. stations
  2101. .runJob("UPDATE_STATION", { stationId })
  2102. .then((station) => next(null, station))
  2103. .catch(next);
  2104. },
  2105. ],
  2106. async (err, station) => {
  2107. if (err) {
  2108. err = await utils.runJob("GET_ERROR", { error: err });
  2109. console.log(
  2110. "ERROR",
  2111. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  2112. `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2113. );
  2114. return cb({ status: "failure", message: err });
  2115. }
  2116. console.log(
  2117. "SUCCESS",
  2118. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  2119. `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`
  2120. );
  2121. notifications.runJob("UNSCHEDULE", {
  2122. name: `stations.nextSong?id${stationId}`,
  2123. });
  2124. if (!station.partyMode)
  2125. stations.runJob("SKIP_STATION", { stationId });
  2126. cache.runJob("PUB", {
  2127. channel: "privatePlaylist.selected",
  2128. value: {
  2129. playlistId,
  2130. stationId,
  2131. },
  2132. });
  2133. return cb({
  2134. status: "success",
  2135. message: "Successfully selected playlist.",
  2136. });
  2137. }
  2138. );
  2139. }
  2140. ),
  2141. favoriteStation: hooks.loginRequired(async (session, stationId, cb) => {
  2142. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  2143. async.waterfall(
  2144. [
  2145. (next) => {
  2146. stations
  2147. .runJob("GET_STATION", { stationId })
  2148. .then((station) => next(null, station))
  2149. .catch(next);
  2150. },
  2151. (station, next) => {
  2152. if (!station) return next("Station not found.");
  2153. stations
  2154. .runJob("CAN_USER_VIEW_STATION", {
  2155. station,
  2156. userId: session.userId,
  2157. })
  2158. .then((canView) => {
  2159. if (canView) return next();
  2160. return next("Insufficient permissions.");
  2161. })
  2162. .catch((err) => {
  2163. return next(err);
  2164. });
  2165. },
  2166. (next) => {
  2167. userModel.updateOne(
  2168. { _id: session.userId },
  2169. { $addToSet: { favoriteStations: stationId } },
  2170. next
  2171. );
  2172. },
  2173. (res, next) => {
  2174. if (res.nModified === 0)
  2175. return next("The station was already favorited.");
  2176. next();
  2177. },
  2178. ],
  2179. async (err) => {
  2180. if (err) {
  2181. err = await utils.runJob("GET_ERROR", { error: err });
  2182. console.log(
  2183. "ERROR",
  2184. "FAVORITE_STATION",
  2185. `Favoriting station "${stationId}" failed. "${err}"`
  2186. );
  2187. return cb({ status: "failure", message: err });
  2188. }
  2189. console.log(
  2190. "SUCCESS",
  2191. "FAVORITE_STATION",
  2192. `Favorited station "${stationId}" successfully.`
  2193. );
  2194. cache.runJob("PUB", {
  2195. channel: "user.favoritedStation",
  2196. value: {
  2197. userId: session.userId,
  2198. stationId,
  2199. },
  2200. });
  2201. return cb({
  2202. status: "success",
  2203. message: "Succesfully favorited station.",
  2204. });
  2205. }
  2206. );
  2207. }),
  2208. unfavoriteStation: hooks.loginRequired(async (session, stationId, cb) => {
  2209. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  2210. async.waterfall(
  2211. [
  2212. (next) => {
  2213. userModel.updateOne(
  2214. { _id: session.userId },
  2215. { $pull: { favoriteStations: stationId } },
  2216. next
  2217. );
  2218. },
  2219. (res, next) => {
  2220. if (res.nModified === 0)
  2221. return next("The station wasn't favorited.");
  2222. next();
  2223. },
  2224. ],
  2225. async (err) => {
  2226. if (err) {
  2227. err = await utils.runJob("GET_ERROR", { error: err });
  2228. console.log(
  2229. "ERROR",
  2230. "UNFAVORITE_STATION",
  2231. `Unfavoriting station "${stationId}" failed. "${err}"`
  2232. );
  2233. return cb({ status: "failure", message: err });
  2234. }
  2235. console.log(
  2236. "SUCCESS",
  2237. "UNFAVORITE_STATION",
  2238. `Unfavorited station "${stationId}" successfully.`
  2239. );
  2240. cache.runJob("PUB", {
  2241. channel: "user.unfavoritedStation",
  2242. value: {
  2243. userId: session.userId,
  2244. stationId,
  2245. },
  2246. });
  2247. return cb({
  2248. status: "success",
  2249. message: "Succesfully unfavorited station.",
  2250. });
  2251. }
  2252. );
  2253. }),
  2254. };