stations.js 86 KB


  1. "use strict";
  2. const async = require("async"),
  3. request = require("request"),
  4. config = require("config"),
  5. _ = require("underscore")._;
  6. const hooks = require("./hooks");
  7. const db = require("../db");
  8. const cache = require("../cache");
  9. const notifications = require("../notifications");
  10. const utils = require("../utils");
  11. const stations = require("../stations");
  12. const songs = require("../songs");
  13. const activities = require("../activities");
  14. // const logger = moduleManager.modules["logger"];
  15. let userList = {};
  16. let usersPerStation = {};
  17. let usersPerStationCount = {};
  18. // Temporarily disabled until the messages in console can be limited
  19. // setInterval(async () => {
  20. // let stationsCountUpdated = [];
  21. // let stationsUpdated = [];
  22. // let oldUsersPerStation = usersPerStation;
  23. // usersPerStation = {};
  24. // let oldUsersPerStationCount = usersPerStationCount;
  25. // usersPerStationCount = {};
  26. // const userModel = await db.runJob("GET_MODEL", {
  27. // modelName: "user",
  28. // });
  29. //
  30. // async.each(
  31. // Object.keys(userList),
  32. // function(socketId, next) {
  33. // utils.runJob("SOCKET_FROM_SESSION", { socketId }).then((socket) => {
  34. // let stationId = userList[socketId];
  35. // if (
  36. // !socket ||
  37. // Object.keys(socket.rooms).indexOf(
  38. // `station.${stationId}`
  39. // ) === -1
  40. // ) {
  41. // if (stationsCountUpdated.indexOf(stationId) === -1)
  42. // stationsCountUpdated.push(stationId);
  43. // if (stationsUpdated.indexOf(stationId) === -1)
  44. // stationsUpdated.push(stationId);
  45. // delete userList[socketId];
  46. // return next();
  47. // }
  48. // if (!usersPerStationCount[stationId])
  49. // usersPerStationCount[stationId] = 0;
  50. // usersPerStationCount[stationId]++;
  51. // if (!usersPerStation[stationId])
  52. // usersPerStation[stationId] = [];
  53. // async.waterfall(
  54. // [
  55. // (next) => {
  56. // if (!socket.session || !socket.session.sessionId)
  57. // return next("No session found.");
  58. // cache
  59. // .runJob("HGET", {
  60. // table: "sessions",
  61. // key: socket.session.sessionId,
  62. // })
  63. // .then((session) => next(null, session))
  64. // .catch(next);
  65. // },
  66. // (session, next) => {
  67. // if (!session) return next("Session not found.");
  68. // userModel.findOne({ _id: session.userId }, next);
  69. // },
  70. // (user, next) => {
  71. // if (!user) return next("User not found.");
  72. // if (
  73. // usersPerStation[stationId].indexOf(
  74. // user.username
  75. // ) !== -1
  76. // )
  77. // return next("User already in the list.");
  78. // next(null, user.username);
  79. // },
  80. // ],
  81. // (err, username) => {
  82. // if (!err) {
  83. // usersPerStation[stationId].push(username);
  84. // }
  85. // next();
  86. // }
  87. // );
  88. // });
  89. // //TODO Code to show users
  90. // },
  91. // (err) => {
  92. // for (let stationId in usersPerStationCount) {
  93. // if (
  94. // oldUsersPerStationCount[stationId] !==
  95. // usersPerStationCount[stationId]
  96. // ) {
  97. // if (stationsCountUpdated.indexOf(stationId) === -1)
  98. // stationsCountUpdated.push(stationId);
  99. // }
  100. // }
  101. // for (let stationId in usersPerStation) {
  102. // if (
  103. // _.difference(
  104. // usersPerStation[stationId],
  105. // oldUsersPerStation[stationId]
  106. // ).length > 0 ||
  107. // _.difference(
  108. // oldUsersPerStation[stationId],
  109. // usersPerStation[stationId]
  110. // ).length > 0
  111. // ) {
  112. // if (stationsUpdated.indexOf(stationId) === -1)
  113. // stationsUpdated.push(stationId);
  114. // }
  115. // }
  116. // stationsCountUpdated.forEach((stationId) => {
  117. // //console.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  118. // cache.runJob("PUB", {
  119. // table: "station.updateUserCount",
  120. // value: stationId,
  121. // });
  122. // });
  123. // stationsUpdated.forEach((stationId) => {
  124. // //console.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  125. // cache.runJob("PUB", {
  126. // table: "station.updateUsers",
  127. // value: stationId,
  128. // });
  129. // });
  130. // //console.log("Userlist", usersPerStation);
  131. // }
  132. // );
  133. // }, 3000);
  134. cache.runJob("SUB", {
  135. channel: "station.updateUsers",
  136. cb: (stationId) => {
  137. let list = usersPerStation[stationId] || [];
  138. utils.runJob("EMIT_TO_ROOM", {
  139. room: `station.${stationId}`,
  140. args: ["event:users.updated", list],
  141. });
  142. },
  143. });
  144. cache.runJob("SUB", {
  145. channel: "station.updateUserCount",
  146. cb: (stationId) => {
  147. let count = usersPerStationCount[stationId] || 0;
  148. utils.runJob("EMIT_TO_ROOM", {
  149. room: `station.${stationId}`,
  150. args: ["event:userCount.updated", count],
  151. });
  152. stations.runJob("GET_STATION", { stationId }).then(async (station) => {
  153. if (station.privacy === "public")
  154. utils.runJob("EMIT_TO_ROOM", {
  155. room: "home",
  156. args: ["event:userCount.updated", stationId, count],
  157. });
  158. else {
  159. let sockets = await utils.runJob("GET_ROOM_SOCKETS", {
  160. room: "home",
  161. });
  162. for (let socketId in sockets) {
  163. let socket = sockets[socketId];
  164. let session = sockets[socketId].session;
  165. if (session.sessionId) {
  166. cache
  167. .runJob("HGET", {
  168. table: "sessions",
  169. key: session.sessionId,
  170. })
  171. .then((session) => {
  172. if (session)
  173. db.runJob("GET_MODEL", {
  174. modelName: "user",
  175. }).then((userModel) =>
  176. userModel.findOne(
  177. { _id: session.userId },
  178. (err, user) => {
  179. if (user.role === "admin")
  180. socket.emit(
  181. "event:userCount.updated",
  182. stationId,
  183. count
  184. );
  185. else if (
  186. station.type ===
  187. "community" &&
  188. station.owner ===
  189. session.userId
  190. )
  191. socket.emit(
  192. "event:userCount.updated",
  193. stationId,
  194. count
  195. );
  196. }
  197. )
  198. );
  199. });
  200. }
  201. }
  202. }
  203. });
  204. },
  205. });
  206. cache.runJob("SUB", {
  207. channel: "station.queueLockToggled",
  208. cb: (data) => {
  209. utils.runJob("EMIT_TO_ROOM", {
  210. room: `station.${data.stationId}`,
  211. args: ["event:queueLockToggled", data.locked],
  212. });
  213. },
  214. });
  215. cache.runJob("SUB", {
  216. channel: "station.updatePartyMode",
  217. cb: (data) => {
  218. utils.runJob("EMIT_TO_ROOM", {
  219. room: `station.${data.stationId}`,
  220. args: ["event:partyMode.updated", data.partyMode],
  221. });
  222. },
  223. });
  224. cache.runJob("SUB", {
  225. channel: "privatePlaylist.selected",
  226. cb: (data) => {
  227. utils.runJob("EMIT_TO_ROOM", {
  228. room: `station.${data.stationId}`,
  229. args: ["event:privatePlaylist.selected", data.playlistId],
  230. });
  231. },
  232. });
  233. cache.runJob("SUB", {
  234. channel: "station.pause",
  235. cb: (stationId) => {
  236. stations.runJob("GET_STATION", { stationId }).then((station) => {
  237. utils.runJob("EMIT_TO_ROOM", {
  238. room: `station.${stationId}`,
  239. args: ["event:stations.pause", { pausedAt: station.pausedAt }],
  240. });
  241. });
  242. },
  243. });
  244. cache.runJob("SUB", {
  245. channel: "station.resume",
  246. cb: (stationId) => {
  247. stations.runJob("GET_STATION", { stationId }).then((station) => {
  248. utils.runJob("EMIT_TO_ROOM", {
  249. room: `station.${stationId}`,
  250. args: [
  251. "event:stations.resume",
  252. { timePaused: station.timePaused },
  253. ],
  254. });
  255. });
  256. },
  257. });
  258. cache.runJob("SUB", {
  259. channel: "station.queueUpdate",
  260. cb: (stationId) => {
  261. stations.runJob("GET_STATION", { stationId }).then((station) => {
  262. utils.runJob("EMIT_TO_ROOM", {
  263. room: `station.${stationId}`,
  264. args: ["event:queue.update", station.queue],
  265. });
  266. });
  267. },
  268. });
  269. cache.runJob("SUB", {
  270. channel: "station.voteSkipSong",
  271. cb: (stationId) => {
  272. utils.runJob("EMIT_TO_ROOM", {
  273. room: `station.${stationId}`,
  274. args: ["event:song.voteSkipSong"],
  275. });
  276. },
  277. });
  278. cache.runJob("SUB", {
  279. channel: "station.remove",
  280. cb: (stationId) => {
  281. utils.runJob("EMIT_TO_ROOM", {
  282. room: `station.${stationId}`,
  283. args: ["event:stations.remove"],
  284. });
  285. utils.runJob("EMIT_TO_ROOM", {
  286. room: "admin.stations",
  287. args: ["event:admin.station.removed", stationId],
  288. });
  289. },
  290. });
  291. cache.runJob("SUB", {
  292. channel: "station.create",
  293. cb: async (stationId) => {
  294. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  295. stations
  296. .runJob("INITIALIZE_STATION", { stationId })
  297. .then(async (response) => {
  298. const station = response.station;
  299. station.userCount = usersPerStationCount[stationId] || 0;
  300. utils.runJob("EMIT_TO_ROOM", {
  301. room: "admin.stations",
  302. args: ["event:admin.station.added", station],
  303. });
  304. // TODO If community, check if on whitelist
  305. if (station.privacy === "public")
  306. utils.runJob("EMIT_TO_ROOM", {
  307. room: "home",
  308. args: ["event:stations.created", station],
  309. });
  310. else {
  311. let sockets = await utils.runJob("GET_ROOM_SOCKETS", {
  312. room: "home",
  313. });
  314. for (let socketId in sockets) {
  315. let socket = sockets[socketId];
  316. let session = sockets[socketId].session;
  317. if (session.sessionId) {
  318. cache
  319. .runJob("HGET", {
  320. table: "sessions",
  321. key: session.sessionId,
  322. })
  323. .then((session) => {
  324. if (session) {
  325. userModel.findOne(
  326. { _id: session.userId },
  327. (err, user) => {
  328. if (user.role === "admin")
  329. socket.emit(
  330. "event:stations.created",
  331. station
  332. );
  333. else if (
  334. station.type ===
  335. "community" &&
  336. station.owner ===
  337. session.userId
  338. )
  339. socket.emit(
  340. "event:stations.created",
  341. station
  342. );
  343. }
  344. );
  345. }
  346. });
  347. }
  348. }
  349. }
  350. });
  351. },
  352. });
  353. module.exports = {
  354. /**
  355. * Get a list of all the stations
  356. *
  357. * @param session
  358. * @param cb
  359. * @return {{ status: String, stations: Array }}
  360. */
  361. index: (session, cb) => {
  362. async.waterfall(
  363. [
  364. (next) => {
  365. console.log(111);
  366. cache
  367. .runJob("HGETALL", { table: "stations" })
  368. .then((stations) => {
  369. next(null, stations);
  370. });
  371. },
  372. (stations, next) => {
  373. console.log(222);
  374. let resultStations = [];
  375. for (let id in stations) {
  376. resultStations.push(stations[id]);
  377. }
  378. next(null, stations);
  379. },
  380. (stationsArray, next) => {
  381. console.log(333);
  382. let resultStations = [];
  383. async.each(
  384. stationsArray,
  385. (station, next) => {
  386. async.waterfall(
  387. [
  388. (next) => {
  389. stations
  390. .runJob("CAN_USER_VIEW_STATION", {
  391. station,
  392. userId: session.userId,
  393. })
  394. .then((exists) => {
  395. console.log(444, exists);
  396. next(null, exists);
  397. })
  398. .catch(next);
  399. },
  400. ],
  401. (err, exists) => {
  402. if (err) console.log(err);
  403. station.userCount =
  404. usersPerStationCount[station._id] || 0;
  405. if (exists) resultStations.push(station);
  406. next();
  407. }
  408. );
  409. },
  410. () => {
  411. next(null, resultStations);
  412. }
  413. );
  414. },
  415. ],
  416. async (err, stations) => {
  417. if (err) {
  418. err = await utils.runJob("GET_ERROR", { error: err });
  419. console.log(
  420. "ERROR",
  421. "STATIONS_INDEX",
  422. `Indexing stations failed. "${err}"`
  423. );
  424. return cb({ status: "failure", message: err });
  425. }
  426. console.log(
  427. "SUCCESS",
  428. "STATIONS_INDEX",
  429. `Indexing stations successful.`,
  430. false
  431. );
  432. return cb({ status: "success", stations: stations });
  433. }
  434. );
  435. },
  436. /**
  437. * Obtains basic metadata of a station in order to format an activity
  438. *
  439. * @param session
  440. * @param stationId - the station id
  441. * @param cb
  442. */
  443. getStationForActivity: (session, stationId, cb) => {
  444. async.waterfall(
  445. [
  446. (next) => {
  447. stations
  448. .runJob("GET_STATION", { stationId })
  449. .then((station) => next(null, station))
  450. .catch(next);
  451. },
  452. ],
  453. async (err, station) => {
  454. if (err) {
  455. err = await utils.runJob("GET_ERROR", { error: err });
  456. console.log(
  457. "ERROR",
  458. "STATIONS_GET_STATION_FOR_ACTIVITY",
  459. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  460. );
  461. return cb({ status: "failure", message: err });
  462. } else {
  463. console.log(
  464. "SUCCESS",
  465. "STATIONS_GET_STATION_FOR_ACTIVITY",
  466. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  467. );
  468. return cb({
  469. status: "success",
  470. data: {
  471. title: station.displayName,
  472. thumbnail: station.currentSong
  473. ? station.currentSong.thumbnail
  474. : "",
  475. },
  476. });
  477. }
  478. }
  479. );
  480. },
  481. /**
  482. * Verifies that a station exists
  483. *
  484. * @param session
  485. * @param stationName - the station name
  486. * @param cb
  487. */
  488. existsByName: (session, stationName, cb) => {
  489. async.waterfall(
  490. [
  491. (next) => {
  492. stations
  493. .runJob("GET_STATION_BY_NAME", { stationName })
  494. .then((station) => next(null, station))
  495. .catch(next);
  496. },
  497. (station, next) => {
  498. if (!station) return next(null, false);
  499. stations
  500. .runJob("CAN_USER_VIEW_STATION", {
  501. station,
  502. userId: session.userId,
  503. })
  504. .then((exists) => next(null, exists))
  505. .catch(next);
  506. },
  507. ],
  508. async (err, exists) => {
  509. if (err) {
  510. err = await utils.runJob("GET_ERROR", { error: err });
  511. console.log(
  512. "ERROR",
  513. "STATION_EXISTS_BY_NAME",
  514. `Checking if station "${stationName}" exists failed. "${err}"`
  515. );
  516. return cb({ status: "failure", message: err });
  517. }
  518. console.log(
  519. "SUCCESS",
  520. "STATION_EXISTS_BY_NAME",
  521. `Station "${stationName}" exists successfully.` /*, false*/
  522. );
  523. cb({ status: "success", exists });
  524. }
  525. );
  526. },
  527. /**
  528. * Gets the official playlist for a station
  529. *
  530. * @param session
  531. * @param stationId - the station id
  532. * @param cb
  533. */
  534. getPlaylist: (session, stationId, cb) => {
  535. async.waterfall(
  536. [
  537. (next) => {
  538. stations
  539. .runJob("GET_STATION", { stationId })
  540. .then((station) => next(null, station))
  541. .catch(next);
  542. },
  543. (station, next) => {
  544. stations
  545. .runJob("CAN_USER_VIEW_STATION", {
  546. station,
  547. userId: session.userId,
  548. })
  549. .then((canView) => {
  550. if (canView) return next(null, station);
  551. return next("Insufficient permissions.");
  552. })
  553. .catch((err) => {
  554. return next(err);
  555. });
  556. },
  557. (station, next) => {
  558. if (!station) return next("Station not found.");
  559. else if (station.type !== "official")
  560. return next("This is not an official station.");
  561. else next();
  562. },
  563. (next) => {
  564. cache
  565. .runJob("HGET", {
  566. table: "officialPlaylists",
  567. key: stationId,
  568. })
  569. .then((playlist) => next(null, playlist))
  570. .catch(next);
  571. },
  572. (playlist, next) => {
  573. if (!playlist) return next("Playlist not found.");
  574. next(null, playlist);
  575. },
  576. ],
  577. async (err, playlist) => {
  578. if (err) {
  579. err = await utils.runJob("GET_ERROR", { error: err });
  580. console.log(
  581. "ERROR",
  582. "STATIONS_GET_PLAYLIST",
  583. `Getting playlist for station "${stationId}" failed. "${err}"`
  584. );
  585. return cb({ status: "failure", message: err });
  586. } else {
  587. console.log(
  588. "SUCCESS",
  589. "STATIONS_GET_PLAYLIST",
  590. `Got playlist for station "${stationId}" successfully.`,
  591. false
  592. );
  593. cb({ status: "success", data: playlist.songs });
  594. }
  595. }
  596. );
  597. },
  598. /**
  599. * Joins the station by its name
  600. *
  601. * @param session
  602. * @param stationName - the station name
  603. * @param cb
  604. * @return {{ status: String, userCount: Integer }}
  605. */
  606. join: (session, stationName, cb) => {
  607. async.waterfall(
  608. [
  609. (next) => {
  610. stations
  611. .runJob("GET_STATION_BY_NAME", { stationName })
  612. .then((station) => next(null, station))
  613. .catch(next);
  614. },
  615. (station, next) => {
  616. if (!station) return next("Station not found.");
  617. stations
  618. .runJob("CAN_USER_VIEW_STATION", {
  619. station,
  620. userId: session.userId,
  621. })
  622. .then((canView) => {
  623. if (!canView) next("Not allowed to join station.");
  624. else next(null, station);
  625. })
  626. .catch((err) => {
  627. return next(err);
  628. });
  629. },
  630. (station, next) => {
  631. utils.runJob("SOCKET_JOIN_ROOM", {
  632. socketId: session.socketId,
  633. room: `station.${station._id}`,
  634. });
  635. let data = {
  636. _id: station._id,
  637. type: station.type,
  638. currentSong: station.currentSong,
  639. startedAt: station.startedAt,
  640. paused: station.paused,
  641. timePaused: station.timePaused,
  642. pausedAt: station.pausedAt,
  643. description: station.description,
  644. displayName: station.displayName,
  645. privacy: station.privacy,
  646. locked: station.locked,
  647. partyMode: station.partyMode,
  648. owner: station.owner,
  649. privatePlaylist: station.privatePlaylist,
  650. };
  651. userList[session.socketId] = station._id;
  652. next(null, data);
  653. },
  654. (data, next) => {
  655. data = JSON.parse(JSON.stringify(data));
  656. data.userCount = usersPerStationCount[data._id] || 0;
  657. data.users = usersPerStation[data._id] || [];
  658. if (!data.currentSong || !data.currentSong.title)
  659. return next(null, data);
  660. utils.runJob("SOCKET_JOIN_SONG_ROOM", {
  661. socketId: session.socketId,
  662. room: `song.${data.currentSong.songId}`,
  663. });
  664. data.currentSong.skipVotes =
  665. data.currentSong.skipVotes.length;
  666. songs
  667. .runJob("GET_SONG_FROM_ID", {
  668. songId: data.currentSong.songId,
  669. })
  670. .then((response) => {
  671. const song = response.song;
  672. if (song) {
  673. data.currentSong.likes = song.likes;
  674. data.currentSong.dislikes = song.dislikes;
  675. } else {
  676. data.currentSong.likes = -1;
  677. data.currentSong.dislikes = -1;
  678. }
  679. })
  680. .catch((err) => {
  681. data.currentSong.likes = -1;
  682. data.currentSong.dislikes = -1;
  683. })
  684. .finally(() => {
  685. next(null, data);
  686. });
  687. },
  688. ],
  689. async (err, data) => {
  690. if (err) {
  691. err = await utils.runJob("GET_ERROR", { error: err });
  692. console.log(
  693. "ERROR",
  694. "STATIONS_JOIN",
  695. `Joining station "${stationName}" failed. "${err}"`
  696. );
  697. return cb({ status: "failure", message: err });
  698. }
  699. console.log(
  700. "SUCCESS",
  701. "STATIONS_JOIN",
  702. `Joined station "${data._id}" successfully.`
  703. );
  704. cb({ status: "success", data });
  705. }
  706. );
  707. },
  708. /**
  709. * Toggles if a station is locked
  710. *
  711. * @param session
  712. * @param stationId - the station id
  713. * @param cb
  714. */
  715. toggleLock: hooks.ownerRequired(async (session, stationId, cb) => {
  716. const stationModel = await db.runJob("GET_MODEL", {
  717. modelName: "station",
  718. });
  719. async.waterfall(
  720. [
  721. (next) => {
  722. stations
  723. .runJob("GET_STATION", { stationId })
  724. .then((station) => next(null, station))
  725. .catch(next);
  726. },
  727. (station, next) => {
  728. stationModel.updateOne(
  729. { _id: stationId },
  730. { $set: { locked: !station.locked } },
  731. next
  732. );
  733. },
  734. (res, next) => {
  735. stations
  736. .runJob("UPDATE_STATION", { stationId })
  737. .then((station) => next(null, station))
  738. .catch(next);
  739. },
  740. ],
  741. async (err, station) => {
  742. if (err) {
  743. err = await utils.runJob("GET_ERROR", { error: err });
  744. console.log(
  745. "ERROR",
  746. "STATIONS_UPDATE_LOCKED_STATUS",
  747. `Toggling the queue lock for station "${stationId}" failed. "${err}"`
  748. );
  749. return cb({ status: "failure", message: err });
  750. } else {
  751. console.log(
  752. "SUCCESS",
  753. "STATIONS_UPDATE_LOCKED_STATUS",
  754. `Toggled the queue lock for station "${stationId}" successfully to "${station.locked}".`
  755. );
  756. cache.runJob("PUB", {
  757. channel: "station.queueLockToggled",
  758. value: {
  759. stationId,
  760. locked: station.locked,
  761. },
  762. });
  763. return cb({ status: "success", data: station.locked });
  764. }
  765. }
  766. );
  767. }),
  768. /**
  769. * Votes to skip a station
  770. *
  771. * @param session
  772. * @param stationId - the station id
  773. * @param cb
  774. */
  775. voteSkip: hooks.loginRequired(async (session, stationId, cb) => {
  776. const stationModel = await db.runJob("GET_MODEL", {
  777. modelName: "station",
  778. });
  779. let skipVotes = 0;
  780. let shouldSkip = false;
  781. async.waterfall(
  782. [
  783. (next) => {
  784. stations
  785. .runJob("GET_STATION", { stationId })
  786. .then((station) => next(null, station))
  787. .catch(next);
  788. },
  789. (station, next) => {
  790. if (!station) return next("Station not found.");
  791. stations
  792. .runJob("CAN_USER_VIEW_STATION", {
  793. station,
  794. userId: session.userId,
  795. })
  796. .then((canView) => {
  797. if (canView) return next(null, station);
  798. return next("Insufficient permissions.");
  799. })
  800. .catch((err) => {
  801. return next(err);
  802. });
  803. },
  804. (station, next) => {
  805. if (!station.currentSong)
  806. return next("There is currently no song to skip.");
  807. if (
  808. station.currentSong.skipVotes.indexOf(
  809. session.userId
  810. ) !== -1
  811. )
  812. return next(
  813. "You have already voted to skip this song."
  814. );
  815. next(null, station);
  816. },
  817. (station, next) => {
  818. stationModel.updateOne(
  819. { _id: stationId },
  820. { $push: { "currentSong.skipVotes": session.userId } },
  821. next
  822. );
  823. },
  824. (res, next) => {
  825. stations
  826. .runJob("UPDATE_STATION", { stationId })
  827. .then((station) => next(null, station))
  828. .catch(next);
  829. },
  830. (station, next) => {
  831. if (!station) return next("Station not found.");
  832. next(null, station);
  833. },
  834. (station, next) => {
  835. skipVotes = station.currentSong.skipVotes.length;
  836. utils
  837. .runJob("GET_ROOM_SOCKETS", {
  838. room: `station.${stationId}`,
  839. })
  840. .then((sockets) => next(null, sockets))
  841. .catch(next);
  842. },
  843. (sockets, next) => {
  844. if (sockets.length <= skipVotes) shouldSkip = true;
  845. next();
  846. },
  847. ],
  848. async (err, station) => {
  849. if (err) {
  850. err = await utils.runJob("GET_ERROR", { error: err });
  851. console.log(
  852. "ERROR",
  853. "STATIONS_VOTE_SKIP",
  854. `Vote skipping station "${stationId}" failed. "${err}"`
  855. );
  856. return cb({ status: "failure", message: err });
  857. }
  858. console.log(
  859. "SUCCESS",
  860. "STATIONS_VOTE_SKIP",
  861. `Vote skipping "${stationId}" successful.`
  862. );
  863. cache.runJob("PUB", {
  864. channel: "station.voteSkipSong",
  865. value: stationId,
  866. });
  867. cb({
  868. status: "success",
  869. message: "Successfully voted to skip the song.",
  870. });
  871. if (shouldSkip) stations.runJob("SKIP_STATION", { stationId });
  872. }
  873. );
  874. }),
  875. /**
  876. * Force skips a station
  877. *
  878. * @param session
  879. * @param stationId - the station id
  880. * @param cb
  881. */
  882. forceSkip: hooks.ownerRequired((session, stationId, cb) => {
  883. async.waterfall(
  884. [
  885. (next) => {
  886. stations
  887. .runJob("GET_STATION", { stationId })
  888. .then((station) => next(null, station))
  889. .catch(next);
  890. },
  891. (station, next) => {
  892. if (!station) return next("Station not found.");
  893. next();
  894. },
  895. ],
  896. async (err) => {
  897. if (err) {
  898. err = await utils.runJob("GET_ERROR", { error: err });
  899. console.log(
  900. "ERROR",
  901. "STATIONS_FORCE_SKIP",
  902. `Force skipping station "${stationId}" failed. "${err}"`
  903. );
  904. return cb({ status: "failure", message: err });
  905. }
  906. notifications.runJob("UNSCHEDULE", {
  907. name: `stations.nextSong?id=${stationId}`,
  908. });
  909. stations.runJob("SKIP_STATION", { stationId });
  910. console.log(
  911. "SUCCESS",
  912. "STATIONS_FORCE_SKIP",
  913. `Force skipped station "${stationId}" successfully.`
  914. );
  915. return cb({
  916. status: "success",
  917. message: "Successfully skipped station.",
  918. });
  919. }
  920. );
  921. }),
  922. /**
  923. * Leaves the user's current station
  924. *
  925. * @param session
  926. * @param stationId
  927. * @param cb
  928. * @return {{ status: String, userCount: Integer }}
  929. */
  930. leave: (session, stationId, cb) => {
  931. async.waterfall(
  932. [
  933. (next) => {
  934. stations
  935. .runJob("GET_STATION", { stationId })
  936. .then((station) => next(null, station))
  937. .catch(next);
  938. },
  939. (station, next) => {
  940. if (!station) return next("Station not found.");
  941. next();
  942. },
  943. ],
  944. async (err, userCount) => {
  945. if (err) {
  946. err = await utils.runJob("GET_ERROR", { error: err });
  947. console.log(
  948. "ERROR",
  949. "STATIONS_LEAVE",
  950. `Leaving station "${stationId}" failed. "${err}"`
  951. );
  952. return cb({ status: "failure", message: err });
  953. }
  954. console.log(
  955. "SUCCESS",
  956. "STATIONS_LEAVE",
  957. `Left station "${stationId}" successfully.`
  958. );
  959. utils.runJob("SOCKET_LEAVE_ROOMS", { socketId: session });
  960. delete userList[session.socketId];
  961. return cb({
  962. status: "success",
  963. message: "Successfully left station.",
  964. userCount,
  965. });
  966. }
  967. );
  968. },
  969. /**
  970. * Updates a station's name
  971. *
  972. * @param session
  973. * @param stationId - the station id
  974. * @param newName - the new station name
  975. * @param cb
  976. */
  977. updateName: hooks.ownerRequired(async (session, stationId, newName, cb) => {
  978. const stationModel = await db.runJob("GET_MODEL", {
  979. modelName: "station",
  980. });
  981. async.waterfall(
  982. [
  983. (next) => {
  984. stationModel.updateOne(
  985. { _id: stationId },
  986. { $set: { name: newName } },
  987. { runValidators: true },
  988. next
  989. );
  990. },
  991. (res, next) => {
  992. stations
  993. .runJob("UPDATE_STATION", { stationId })
  994. .then((station) => next(null, station))
  995. .catch(next);
  996. },
  997. ],
  998. async (err) => {
  999. if (err) {
  1000. err = await utils.runJob("GET_ERROR", { error: err });
  1001. console.log(
  1002. "ERROR",
  1003. "STATIONS_UPDATE_NAME",
  1004. `Updating station "${stationId}" name to "${newName}" failed. "${err}"`
  1005. );
  1006. return cb({ status: "failure", message: err });
  1007. }
  1008. console.log(
  1009. "SUCCESS",
  1010. "STATIONS_UPDATE_NAME",
  1011. `Updated station "${stationId}" name to "${newName}" successfully.`
  1012. );
  1013. return cb({
  1014. status: "success",
  1015. message: "Successfully updated the name.",
  1016. });
  1017. }
  1018. );
  1019. }),
  1020. /**
  1021. * Updates a station's display name
  1022. *
  1023. * @param session
  1024. * @param stationId - the station id
  1025. * @param newDisplayName - the new station display name
  1026. * @param cb
  1027. */
  1028. updateDisplayName: hooks.ownerRequired(
  1029. async (session, stationId, newDisplayName, cb) => {
  1030. const stationModel = await db.runJob("GET_MODEL", {
  1031. modelName: "station",
  1032. });
  1033. async.waterfall(
  1034. [
  1035. (next) => {
  1036. stationModel.updateOne(
  1037. { _id: stationId },
  1038. { $set: { displayName: newDisplayName } },
  1039. { runValidators: true },
  1040. next
  1041. );
  1042. },
  1043. (res, next) => {
  1044. stations
  1045. .runJob("UPDATE_STATION", { stationId })
  1046. .then((station) => next(null, station))
  1047. .catch(next);
  1048. },
  1049. ],
  1050. async (err) => {
  1051. if (err) {
  1052. err = await utils.runJob("GET_ERROR", { error: err });
  1053. console.log(
  1054. "ERROR",
  1055. "STATIONS_UPDATE_DISPLAY_NAME",
  1056. `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`
  1057. );
  1058. return cb({ status: "failure", message: err });
  1059. }
  1060. console.log(
  1061. "SUCCESS",
  1062. "STATIONS_UPDATE_DISPLAY_NAME",
  1063. `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`
  1064. );
  1065. return cb({
  1066. status: "success",
  1067. message: "Successfully updated the display name.",
  1068. });
  1069. }
  1070. );
  1071. }
  1072. ),
  1073. /**
  1074. * Updates a station's description
  1075. *
  1076. * @param session
  1077. * @param stationId - the station id
  1078. * @param newDescription - the new station description
  1079. * @param cb
  1080. */
  1081. updateDescription: hooks.ownerRequired(
  1082. async (session, stationId, newDescription, cb) => {
  1083. const stationModel = await db.runJob("GET_MODEL", {
  1084. modelName: "station",
  1085. });
  1086. async.waterfall(
  1087. [
  1088. (next) => {
  1089. stationModel.updateOne(
  1090. { _id: stationId },
  1091. { $set: { description: newDescription } },
  1092. { runValidators: true },
  1093. next
  1094. );
  1095. },
  1096. (res, next) => {
  1097. stations
  1098. .runJob("UPDATE_STATION", { stationId })
  1099. .then((station) => next(null, station))
  1100. .catch(next);
  1101. },
  1102. ],
  1103. async (err) => {
  1104. if (err) {
  1105. err = await utils.runJob("GET_ERROR", { error: err });
  1106. console.log(
  1107. "ERROR",
  1108. "STATIONS_UPDATE_DESCRIPTION",
  1109. `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`
  1110. );
  1111. return cb({ status: "failure", message: err });
  1112. }
  1113. console.log(
  1114. "SUCCESS",
  1115. "STATIONS_UPDATE_DESCRIPTION",
  1116. `Updated station "${stationId}" description to "${newDescription}" successfully.`
  1117. );
  1118. return cb({
  1119. status: "success",
  1120. message: "Successfully updated the description.",
  1121. });
  1122. }
  1123. );
  1124. }
  1125. ),
  1126. /**
  1127. * Updates a station's privacy
  1128. *
  1129. * @param session
  1130. * @param stationId - the station id
  1131. * @param newPrivacy - the new station privacy
  1132. * @param cb
  1133. */
  1134. updatePrivacy: hooks.ownerRequired(
  1135. async (session, stationId, newPrivacy, cb) => {
  1136. const stationModel = await db.runJob("GET_MODEL", {
  1137. modelName: "station",
  1138. });
  1139. async.waterfall(
  1140. [
  1141. (next) => {
  1142. stationModel.updateOne(
  1143. { _id: stationId },
  1144. { $set: { privacy: newPrivacy } },
  1145. { runValidators: true },
  1146. next
  1147. );
  1148. },
  1149. (res, next) => {
  1150. stations
  1151. .runJob("UPDATE_STATION", { stationId })
  1152. .then((station) => next(null, station))
  1153. .catch(next);
  1154. },
  1155. ],
  1156. async (err) => {
  1157. if (err) {
  1158. err = await utils.runJob("GET_ERROR", { error: err });
  1159. console.log(
  1160. "ERROR",
  1161. "STATIONS_UPDATE_PRIVACY",
  1162. `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`
  1163. );
  1164. return cb({ status: "failure", message: err });
  1165. }
  1166. console.log(
  1167. "SUCCESS",
  1168. "STATIONS_UPDATE_PRIVACY",
  1169. `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`
  1170. );
  1171. return cb({
  1172. status: "success",
  1173. message: "Successfully updated the privacy.",
  1174. });
  1175. }
  1176. );
  1177. }
  1178. ),
  1179. /**
  1180. * Updates a station's genres
  1181. *
  1182. * @param session
  1183. * @param stationId - the station id
  1184. * @param newGenres - the new station genres
  1185. * @param cb
  1186. */
  1187. updateGenres: hooks.ownerRequired(
  1188. async (session, stationId, newGenres, cb) => {
  1189. const stationModel = await db.runJob("GET_MODEL", {
  1190. modelName: "station",
  1191. });
  1192. async.waterfall(
  1193. [
  1194. (next) => {
  1195. stationModel.updateOne(
  1196. { _id: stationId },
  1197. { $set: { genres: newGenres } },
  1198. { runValidators: true },
  1199. next
  1200. );
  1201. },
  1202. (res, next) => {
  1203. stations
  1204. .runJob("UPDATE_STATION", { stationId })
  1205. .then((station) => next(null, station))
  1206. .catch(next);
  1207. },
  1208. ],
  1209. async (err) => {
  1210. if (err) {
  1211. err = await utils.runJob("GET_ERROR", { error: err });
  1212. console.log(
  1213. "ERROR",
  1214. "STATIONS_UPDATE_GENRES",
  1215. `Updating station "${stationId}" genres to "${newGenres}" failed. "${err}"`
  1216. );
  1217. return cb({ status: "failure", message: err });
  1218. }
  1219. console.log(
  1220. "SUCCESS",
  1221. "STATIONS_UPDATE_GENRES",
  1222. `Updated station "${stationId}" genres to "${newGenres}" successfully.`
  1223. );
  1224. return cb({
  1225. status: "success",
  1226. message: "Successfully updated the genres.",
  1227. });
  1228. }
  1229. );
  1230. }
  1231. ),
  1232. /**
  1233. * Updates a station's blacklisted genres
  1234. *
  1235. * @param session
  1236. * @param stationId - the station id
  1237. * @param newBlacklistedGenres - the new station blacklisted genres
  1238. * @param cb
  1239. */
  1240. updateBlacklistedGenres: hooks.ownerRequired(
  1241. async (session, stationId, newBlacklistedGenres, cb) => {
  1242. const stationModel = await db.runJob("GET_MODEL", {
  1243. modelName: "station",
  1244. });
  1245. async.waterfall(
  1246. [
  1247. (next) => {
  1248. stationModel.updateOne(
  1249. { _id: stationId },
  1250. {
  1251. $set: {
  1252. blacklistedGenres: newBlacklistedGenres,
  1253. },
  1254. },
  1255. { runValidators: true },
  1256. next
  1257. );
  1258. },
  1259. (res, next) => {
  1260. stations
  1261. .runJob("UPDATE_STATION", { stationId })
  1262. .then((station) => next(null, station))
  1263. .catch(next);
  1264. },
  1265. ],
  1266. async (err) => {
  1267. if (err) {
  1268. err = await utils.runJob("GET_ERROR", { error: err });
  1269. console.log(
  1270. "ERROR",
  1271. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1272. `Updating station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" failed. "${err}"`
  1273. );
  1274. return cb({ status: "failure", message: err });
  1275. }
  1276. console.log(
  1277. "SUCCESS",
  1278. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1279. `Updated station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" successfully.`
  1280. );
  1281. return cb({
  1282. status: "success",
  1283. message: "Successfully updated the blacklisted genres.",
  1284. });
  1285. }
  1286. );
  1287. }
  1288. ),
  1289. /**
  1290. * Updates a station's party mode
  1291. *
  1292. * @param session
  1293. * @param stationId - the station id
  1294. * @param newPartyMode - the new station party mode
  1295. * @param cb
  1296. */
  1297. updatePartyMode: hooks.ownerRequired(
  1298. async (session, stationId, newPartyMode, cb) => {
  1299. const stationModel = await db.runJob("GET_MODEL", {
  1300. modelName: "station",
  1301. });
  1302. async.waterfall(
  1303. [
  1304. (next) => {
  1305. stations
  1306. .runJob("GET_STATION", { stationId })
  1307. .then((station) => next(null, station))
  1308. .catch(next);
  1309. },
  1310. (station, next) => {
  1311. if (!station) return next("Station not found.");
  1312. if (station.partyMode === newPartyMode)
  1313. return next(
  1314. "The party mode was already " +
  1315. (newPartyMode ? "enabled." : "disabled.")
  1316. );
  1317. stationModel.updateOne(
  1318. { _id: stationId },
  1319. { $set: { partyMode: newPartyMode } },
  1320. { runValidators: true },
  1321. next
  1322. );
  1323. },
  1324. (res, next) => {
  1325. stations
  1326. .runJob("UPDATE_STATION", { stationId })
  1327. .then((station) => next(null, station))
  1328. .catch(next);
  1329. },
  1330. ],
  1331. async (err) => {
  1332. if (err) {
  1333. err = await utils.runJob("GET_ERROR", { error: err });
  1334. console.log(
  1335. "ERROR",
  1336. "STATIONS_UPDATE_PARTY_MODE",
  1337. `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`
  1338. );
  1339. return cb({ status: "failure", message: err });
  1340. }
  1341. console.log(
  1342. "SUCCESS",
  1343. "STATIONS_UPDATE_PARTY_MODE",
  1344. `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`
  1345. );
  1346. cache.runJob("PUB", {
  1347. channel: "station.updatePartyMode",
  1348. value: {
  1349. stationId: stationId,
  1350. partyMode: newPartyMode,
  1351. },
  1352. });
  1353. stations.runJob("SKIP_STATION", { stationId });
  1354. return cb({
  1355. status: "success",
  1356. message: "Successfully updated the party mode.",
  1357. });
  1358. }
  1359. );
  1360. }
  1361. ),
  1362. /**
  1363. * Pauses a station
  1364. *
  1365. * @param session
  1366. * @param stationId - the station id
  1367. * @param cb
  1368. */
  1369. pause: hooks.ownerRequired(async (session, stationId, cb) => {
  1370. const stationModel = await db.runJob("GET_MODEL", {
  1371. modelName: "station",
  1372. });
  1373. async.waterfall(
  1374. [
  1375. (next) => {
  1376. stations
  1377. .runJob("GET_STATION", { stationId })
  1378. .then((station) => next(null, station))
  1379. .catch(next);
  1380. },
  1381. (station, next) => {
  1382. if (!station) return next("Station not found.");
  1383. if (station.paused)
  1384. return next("That station was already paused.");
  1385. stationModel.updateOne(
  1386. { _id: stationId },
  1387. { $set: { paused: true, pausedAt: Date.now() } },
  1388. next
  1389. );
  1390. },
  1391. (res, next) => {
  1392. stations
  1393. .runJob("UPDATE_STATION", { stationId })
  1394. .then((station) => next(null, station))
  1395. .catch(next);
  1396. },
  1397. ],
  1398. async (err) => {
  1399. if (err) {
  1400. err = await utils.runJob("GET_ERROR", { error: err });
  1401. console.log(
  1402. "ERROR",
  1403. "STATIONS_PAUSE",
  1404. `Pausing station "${stationId}" failed. "${err}"`
  1405. );
  1406. return cb({ status: "failure", message: err });
  1407. }
  1408. console.log(
  1409. "SUCCESS",
  1410. "STATIONS_PAUSE",
  1411. `Paused station "${stationId}" successfully.`
  1412. );
  1413. cache.runJob("PUB", {
  1414. channel: "station.pause",
  1415. value: stationId,
  1416. });
  1417. notifications.runJob("UNSCHEDULE", {
  1418. name: `stations.nextSong?id=${stationId}`,
  1419. });
  1420. return cb({
  1421. status: "success",
  1422. message: "Successfully paused.",
  1423. });
  1424. }
  1425. );
  1426. }),
  1427. /**
  1428. * Resumes a station
  1429. *
  1430. * @param session
  1431. * @param stationId - the station id
  1432. * @param cb
  1433. */
  1434. resume: hooks.ownerRequired(async (session, stationId, cb) => {
  1435. const stationModel = await db.runJob("GET_MODEL", {
  1436. modelName: "station",
  1437. });
  1438. async.waterfall(
  1439. [
  1440. (next) => {
  1441. stations
  1442. .runJob("GET_STATION", { stationId })
  1443. .then((station) => next(null, station))
  1444. .catch(next);
  1445. },
  1446. (station, next) => {
  1447. if (!station) return next("Station not found.");
  1448. if (!station.paused)
  1449. return next("That station is not paused.");
  1450. station.timePaused += Date.now() - station.pausedAt;
  1451. stationModel.updateOne(
  1452. { _id: stationId },
  1453. {
  1454. $set: { paused: false },
  1455. $inc: { timePaused: Date.now() - station.pausedAt },
  1456. },
  1457. next
  1458. );
  1459. },
  1460. (res, next) => {
  1461. stations
  1462. .runJob("UPDATE_STATION", { stationId })
  1463. .then((station) => next(null, station))
  1464. .catch(next);
  1465. },
  1466. ],
  1467. async (err) => {
  1468. if (err) {
  1469. err = await utils.runJob("GET_ERROR", { error: err });
  1470. console.log(
  1471. "ERROR",
  1472. "STATIONS_RESUME",
  1473. `Resuming station "${stationId}" failed. "${err}"`
  1474. );
  1475. return cb({ status: "failure", message: err });
  1476. }
  1477. console.log(
  1478. "SUCCESS",
  1479. "STATIONS_RESUME",
  1480. `Resuming station "${stationId}" successfully.`
  1481. );
  1482. cache.runJob("PUB", {
  1483. channel: "station.resume",
  1484. value: stationId,
  1485. });
  1486. return cb({
  1487. status: "success",
  1488. message: "Successfully resumed.",
  1489. });
  1490. }
  1491. );
  1492. }),
  1493. /**
  1494. * Removes a station
  1495. *
  1496. * @param session
  1497. * @param stationId - the station id
  1498. * @param cb
  1499. */
  1500. remove: hooks.ownerRequired(async (session, stationId, cb) => {
  1501. const stationModel = await db.runJob("GET_MODEL", {
  1502. modelName: "station",
  1503. });
  1504. async.waterfall(
  1505. [
  1506. (next) => {
  1507. stationModel.deleteOne({ _id: stationId }, (err) =>
  1508. next(err)
  1509. );
  1510. },
  1511. (next) => {
  1512. cache
  1513. .runJob("HDEL", { table: "stations", key: stationId })
  1514. .then(next)
  1515. .catch(next);
  1516. },
  1517. ],
  1518. async (err) => {
  1519. if (err) {
  1520. err = await utils.runJob("GET_ERROR", { error: err });
  1521. console.log(
  1522. "ERROR",
  1523. "STATIONS_REMOVE",
  1524. `Removing station "${stationId}" failed. "${err}"`
  1525. );
  1526. return cb({ status: "failure", message: err });
  1527. }
  1528. console.log(
  1529. "SUCCESS",
  1530. "STATIONS_REMOVE",
  1531. `Removing station "${stationId}" successfully.`
  1532. );
  1533. cache.runJob("PUB", {
  1534. channel: "station.remove",
  1535. value: stationId,
  1536. });
  1537. activities.runJob("ADD_ACTIVITY", {
  1538. userId: session.userId,
  1539. activityType: "deleted_station",
  1540. payload: [stationId],
  1541. });
  1542. return cb({
  1543. status: "success",
  1544. message: "Successfully removed.",
  1545. });
  1546. }
  1547. );
  1548. }),
  1549. /**
  1550. * Create a station
  1551. *
  1552. * @param session
  1553. * @param data - the station data
  1554. * @param cb
  1555. */
  1556. create: hooks.loginRequired(async (session, data, cb) => {
  1557. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1558. const stationModel = await db.runJob("GET_MODEL", {
  1559. modelName: "station",
  1560. });
  1561. data.name = data.name.toLowerCase();
  1562. let blacklist = [
  1563. "country",
  1564. "edm",
  1565. "musare",
  1566. "hip-hop",
  1567. "rap",
  1568. "top-hits",
  1569. "todays-hits",
  1570. "old-school",
  1571. "christmas",
  1572. "about",
  1573. "support",
  1574. "staff",
  1575. "help",
  1576. "news",
  1577. "terms",
  1578. "privacy",
  1579. "profile",
  1580. "c",
  1581. "community",
  1582. "tos",
  1583. "login",
  1584. "register",
  1585. "p",
  1586. "official",
  1587. "o",
  1588. "trap",
  1589. "faq",
  1590. "team",
  1591. "donate",
  1592. "buy",
  1593. "shop",
  1594. "forums",
  1595. "explore",
  1596. "settings",
  1597. "admin",
  1598. "auth",
  1599. "reset_password",
  1600. ];
  1601. async.waterfall(
  1602. [
  1603. (next) => {
  1604. if (!data) return next("Invalid data.");
  1605. next();
  1606. },
  1607. (next) => {
  1608. stationModel.findOne(
  1609. {
  1610. $or: [
  1611. { name: data.name },
  1612. {
  1613. displayName: new RegExp(
  1614. `^${data.displayName}$`,
  1615. "i"
  1616. ),
  1617. },
  1618. ],
  1619. },
  1620. next
  1621. );
  1622. },
  1623. (station, next) => {
  1624. if (station)
  1625. return next(
  1626. "A station with that name or display name already exists."
  1627. );
  1628. const {
  1629. name,
  1630. displayName,
  1631. description,
  1632. genres,
  1633. playlist,
  1634. type,
  1635. blacklistedGenres,
  1636. } = data;
  1637. if (type === "official") {
  1638. userModel.findOne(
  1639. { _id: session.userId },
  1640. (err, user) => {
  1641. if (err) return next(err);
  1642. if (!user) return next("User not found.");
  1643. if (user.role !== "admin")
  1644. return next("Admin required.");
  1645. stationModel.create(
  1646. {
  1647. name,
  1648. displayName,
  1649. description,
  1650. type,
  1651. privacy: "private",
  1652. playlist,
  1653. genres,
  1654. blacklistedGenres,
  1655. currentSong: stations.defaultSong,
  1656. },
  1657. next
  1658. );
  1659. }
  1660. );
  1661. } else if (type === "community") {
  1662. if (blacklist.indexOf(name) !== -1)
  1663. return next(
  1664. "That name is blacklisted. Please use a different name."
  1665. );
  1666. stationModel.create(
  1667. {
  1668. name,
  1669. displayName,
  1670. description,
  1671. type,
  1672. privacy: "private",
  1673. owner: session.userId,
  1674. queue: [],
  1675. currentSong: null,
  1676. },
  1677. next
  1678. );
  1679. }
  1680. },
  1681. ],
  1682. async (err, station) => {
  1683. if (err) {
  1684. err = await utils.runJob("GET_ERROR", { error: err });
  1685. console.log(
  1686. "ERROR",
  1687. "STATIONS_CREATE",
  1688. `Creating station failed. "${err}"`
  1689. );
  1690. return cb({ status: "failure", message: err });
  1691. }
  1692. console.log(
  1693. "SUCCESS",
  1694. "STATIONS_CREATE",
  1695. `Created station "${station._id}" successfully.`
  1696. );
  1697. cache.runJob("PUB", {
  1698. channel: "station.create",
  1699. value: station._id,
  1700. });
  1701. activities.runJob("ADD_ACTIVITY", {
  1702. userId: session.userId,
  1703. activityType: "created_station",
  1704. payload: [station._id],
  1705. });
  1706. return cb({
  1707. status: "success",
  1708. message: "Successfully created station.",
  1709. });
  1710. }
  1711. );
  1712. }),
  1713. /**
  1714. * Adds song to station queue
  1715. *
  1716. * @param session
  1717. * @param stationId - the station id
  1718. * @param songId - the song id
  1719. * @param cb
  1720. */
  1721. addToQueue: hooks.loginRequired(async (session, stationId, songId, cb) => {
  1722. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1723. const stationModel = await db.runJob("GET_MODEL", {
  1724. modelName: "station",
  1725. });
  1726. async.waterfall(
  1727. [
  1728. (next) => {
  1729. stations
  1730. .runJob("GET_STATION", { stationId })
  1731. .then((station) => next(null, station))
  1732. .catch(next);
  1733. },
  1734. (station, next) => {
  1735. if (!station) return next("Station not found.");
  1736. if (station.locked) {
  1737. userModel.findOne(
  1738. { _id: session.userId },
  1739. (err, user) => {
  1740. if (
  1741. user.role !== "admin" &&
  1742. station.owner !== session.userId
  1743. )
  1744. return next(
  1745. "Only owners and admins can add songs to a locked queue."
  1746. );
  1747. else return next(null, station);
  1748. }
  1749. );
  1750. } else {
  1751. return next(null, station);
  1752. }
  1753. },
  1754. (station, next) => {
  1755. if (station.type !== "community")
  1756. return next("That station is not a community station.");
  1757. stations
  1758. .runJob("CAN_USER_VIEW_STATION", {
  1759. station,
  1760. userId: session.userId,
  1761. })
  1762. .then((canView) => {
  1763. if (canView) return next(null, station);
  1764. return next("Insufficient permissions.");
  1765. })
  1766. .catch((err) => {
  1767. return next(err);
  1768. });
  1769. },
  1770. (station, next) => {
  1771. if (
  1772. station.currentSong &&
  1773. station.currentSong.songId === songId
  1774. )
  1775. return next("That song is currently playing.");
  1776. async.each(
  1777. station.queue,
  1778. (queueSong, next) => {
  1779. if (queueSong.songId === songId)
  1780. return next(
  1781. "That song is already in the queue."
  1782. );
  1783. next();
  1784. },
  1785. (err) => {
  1786. next(err, station);
  1787. }
  1788. );
  1789. },
  1790. (station, next) => {
  1791. // songs
  1792. // .runJob("GET_SONG", { id: songId })
  1793. // .then((song) => {
  1794. // if (song) return next(null, song, station);
  1795. // else {
  1796. utils
  1797. .runJob("GET_SONG_FROM_YOUTUBE", { songId })
  1798. .then((response) => {
  1799. const song = response.song;
  1800. song.artists = [];
  1801. song.skipDuration = 0;
  1802. song.likes = -1;
  1803. song.dislikes = -1;
  1804. song.thumbnail = "empty";
  1805. song.explicit = false;
  1806. next(null, song, station);
  1807. })
  1808. .catch((err) => {
  1809. next(err);
  1810. });
  1811. // }
  1812. // })
  1813. // .catch((err) => {
  1814. // next(err);
  1815. // });
  1816. },
  1817. (song, station, next) => {
  1818. let queue = station.queue;
  1819. song.requestedBy = session.userId;
  1820. queue.push(song);
  1821. let totalDuration = 0;
  1822. queue.forEach((song) => {
  1823. totalDuration += song.duration;
  1824. });
  1825. if (totalDuration >= 3600 * 3)
  1826. return next("The max length of the queue is 3 hours.");
  1827. next(null, song, station);
  1828. },
  1829. (song, station, next) => {
  1830. let queue = station.queue;
  1831. if (queue.length === 0) return next(null, song, station);
  1832. let totalDuration = 0;
  1833. const userId = queue[queue.length - 1].requestedBy;
  1834. station.queue.forEach((song) => {
  1835. if (userId === song.requestedBy) {
  1836. totalDuration += song.duration;
  1837. }
  1838. });
  1839. if (totalDuration >= 900)
  1840. return next(
  1841. "The max length of songs per user is 15 minutes."
  1842. );
  1843. next(null, song, station);
  1844. },
  1845. (song, station, next) => {
  1846. let queue = station.queue;
  1847. if (queue.length === 0) return next(null, song);
  1848. let totalSongs = 0;
  1849. const userId = queue[queue.length - 1].requestedBy;
  1850. queue.forEach((song) => {
  1851. if (userId === song.requestedBy) {
  1852. totalSongs++;
  1853. }
  1854. });
  1855. if (totalSongs <= 2) return next(null, song);
  1856. if (totalSongs > 3)
  1857. return next(
  1858. "The max amount of songs per user is 3, and only 2 in a row is allowed."
  1859. );
  1860. if (
  1861. queue[queue.length - 2].requestedBy !== userId ||
  1862. queue[queue.length - 3] !== userId
  1863. )
  1864. return next(
  1865. "The max amount of songs per user is 3, and only 2 in a row is allowed."
  1866. );
  1867. next(null, song);
  1868. },
  1869. (song, next) => {
  1870. stationModel.updateOne(
  1871. { _id: stationId },
  1872. { $push: { queue: song } },
  1873. { runValidators: true },
  1874. next
  1875. );
  1876. },
  1877. (res, next) => {
  1878. stations
  1879. .runJob("UPDATE_STATION", { stationId })
  1880. .then((station) => next(null, station))
  1881. .catch(next);
  1882. },
  1883. ],
  1884. async (err, station) => {
  1885. if (err) {
  1886. err = await utils.runJob("GET_ERROR", { error: err });
  1887. console.log(
  1888. "ERROR",
  1889. "STATIONS_ADD_SONG_TO_QUEUE",
  1890. `Adding song "${songId}" to station "${stationId}" queue failed. "${err}"`
  1891. );
  1892. return cb({ status: "failure", message: err });
  1893. }
  1894. console.log(
  1895. "SUCCESS",
  1896. "STATIONS_ADD_SONG_TO_QUEUE",
  1897. `Added song "${songId}" to station "${stationId}" successfully.`
  1898. );
  1899. cache.runJob("PUB", {
  1900. channel: "station.queueUpdate",
  1901. value: stationId,
  1902. });
  1903. return cb({
  1904. status: "success",
  1905. message: "Successfully added song to queue.",
  1906. });
  1907. }
  1908. );
  1909. }),
  1910. /**
  1911. * Removes song from station queue
  1912. *
  1913. * @param session
  1914. * @param stationId - the station id
  1915. * @param songId - the song id
  1916. * @param cb
  1917. */
  1918. removeFromQueue: hooks.ownerRequired(
  1919. async (session, stationId, songId, cb) => {
  1920. const stationModel = await db.runJob("GET_MODEL", {
  1921. modelName: "station",
  1922. });
  1923. async.waterfall(
  1924. [
  1925. (next) => {
  1926. if (!songId) return next("Invalid song id.");
  1927. stations
  1928. .runJob("GET_STATION", { stationId })
  1929. .then((station) => next(null, station))
  1930. .catch(next);
  1931. },
  1932. (station, next) => {
  1933. if (!station) return next("Station not found.");
  1934. if (station.type !== "community")
  1935. return next("Station is not a community station.");
  1936. async.each(
  1937. station.queue,
  1938. (queueSong, next) => {
  1939. if (queueSong.songId === songId)
  1940. return next(true);
  1941. next();
  1942. },
  1943. (err) => {
  1944. if (err === true) return next();
  1945. next("Song is not currently in the queue.");
  1946. }
  1947. );
  1948. },
  1949. (next) => {
  1950. stationModel.updateOne(
  1951. { _id: stationId },
  1952. { $pull: { queue: { songId: songId } } },
  1953. next
  1954. );
  1955. },
  1956. (res, next) => {
  1957. stations
  1958. .runJob("UPDATE_STATION", { stationId })
  1959. .then((station) => next(null, station))
  1960. .catch(next);
  1961. },
  1962. ],
  1963. async (err, station) => {
  1964. if (err) {
  1965. err = await utils.runJob("GET_ERROR", { error: err });
  1966. console.log(
  1967. "ERROR",
  1968. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1969. `Removing song "${songId}" from station "${stationId}" queue failed. "${err}"`
  1970. );
  1971. return cb({ status: "failure", message: err });
  1972. }
  1973. console.log(
  1974. "SUCCESS",
  1975. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1976. `Removed song "${songId}" from station "${stationId}" successfully.`
  1977. );
  1978. cache.runJob("PUB", {
  1979. channel: "station.queueUpdate",
  1980. value: stationId,
  1981. });
  1982. return cb({
  1983. status: "success",
  1984. message: "Successfully removed song from queue.",
  1985. });
  1986. }
  1987. );
  1988. }
  1989. ),
  1990. /**
  1991. * Gets the queue from a station
  1992. *
  1993. * @param session
  1994. * @param stationId - the station id
  1995. * @param cb
  1996. */
  1997. getQueue: (session, stationId, cb) => {
  1998. async.waterfall(
  1999. [
  2000. (next) => {
  2001. stations
  2002. .runJob("GET_STATION", { stationId })
  2003. .then((station) => next(null, station))
  2004. .catch(next);
  2005. },
  2006. (station, next) => {
  2007. if (!station) return next("Station not found.");
  2008. if (station.type !== "community")
  2009. return next("Station is not a community station.");
  2010. next(null, station);
  2011. },
  2012. (station, next) => {
  2013. stations
  2014. .runJob("CAN_USER_VIEW_STATION", {
  2015. station,
  2016. userId: session.userId,
  2017. })
  2018. .then((canView) => {
  2019. if (canView) return next(null, station);
  2020. return next("Insufficient permissions.");
  2021. })
  2022. .catch((err) => {
  2023. return next(err);
  2024. });
  2025. },
  2026. ],
  2027. async (err, station) => {
  2028. if (err) {
  2029. err = await utils.runJob("GET_ERROR", { error: err });
  2030. console.log(
  2031. "ERROR",
  2032. "STATIONS_GET_QUEUE",
  2033. `Getting queue for station "${stationId}" failed. "${err}"`
  2034. );
  2035. return cb({ status: "failure", message: err });
  2036. }
  2037. console.log(
  2038. "SUCCESS",
  2039. "STATIONS_GET_QUEUE",
  2040. `Got queue for station "${stationId}" successfully.`
  2041. );
  2042. return cb({
  2043. status: "success",
  2044. message: "Successfully got queue.",
  2045. queue: station.queue,
  2046. });
  2047. }
  2048. );
  2049. },
  2050. /**
  2051. * Selects a private playlist for a station
  2052. *
  2053. * @param session
  2054. * @param stationId - the station id
  2055. * @param playlistId - the private playlist id
  2056. * @param cb
  2057. */
  2058. selectPrivatePlaylist: hooks.ownerRequired(
  2059. async (session, stationId, playlistId, cb) => {
  2060. const stationModel = await db.runJob("GET_MODEL", {
  2061. modelName: "station",
  2062. });
  2063. const playlistModel = await db.runJob("GET_MODEL", {
  2064. modelName: "playlist",
  2065. });
  2066. async.waterfall(
  2067. [
  2068. (next) => {
  2069. stations
  2070. .runJob("GET_STATION", { stationId })
  2071. .then((station) => next(null, station))
  2072. .catch(next);
  2073. },
  2074. (station, next) => {
  2075. if (!station) return next("Station not found.");
  2076. if (station.type !== "community")
  2077. return next("Station is not a community station.");
  2078. if (station.privatePlaylist === playlistId)
  2079. return next(
  2080. "That private playlist is already selected."
  2081. );
  2082. playlistModel.findOne({ _id: playlistId }, next);
  2083. },
  2084. (playlist, next) => {
  2085. if (!playlist) return next("Playlist not found.");
  2086. let currentSongIndex =
  2087. playlist.songs.length > 0
  2088. ? playlist.songs.length - 1
  2089. : 0;
  2090. stationModel.updateOne(
  2091. { _id: stationId },
  2092. {
  2093. $set: {
  2094. privatePlaylist: playlistId,
  2095. currentSongIndex: currentSongIndex,
  2096. },
  2097. },
  2098. { runValidators: true },
  2099. next
  2100. );
  2101. },
  2102. (res, next) => {
  2103. stations
  2104. .runJob("UPDATE_STATION", { stationId })
  2105. .then((station) => next(null, station))
  2106. .catch(next);
  2107. },
  2108. ],
  2109. async (err, station) => {
  2110. if (err) {
  2111. err = await utils.runJob("GET_ERROR", { error: err });
  2112. console.log(
  2113. "ERROR",
  2114. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  2115. `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2116. );
  2117. return cb({ status: "failure", message: err });
  2118. }
  2119. console.log(
  2120. "SUCCESS",
  2121. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  2122. `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`
  2123. );
  2124. notifications.runJob("UNSCHEDULE", {
  2125. name: `stations.nextSong?id${stationId}`,
  2126. });
  2127. if (!station.partyMode)
  2128. stations.runJob("SKIP_STATION", { stationId });
  2129. cache.runJob("PUB", {
  2130. channel: "privatePlaylist.selected",
  2131. value: {
  2132. playlistId,
  2133. stationId,
  2134. },
  2135. });
  2136. return cb({
  2137. status: "success",
  2138. message: "Successfully selected playlist.",
  2139. });
  2140. }
  2141. );
  2142. }
  2143. ),
  2144. favoriteStation: hooks.loginRequired(async (session, stationId, cb) => {
  2145. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  2146. async.waterfall(
  2147. [
  2148. (next) => {
  2149. stations
  2150. .runJob("GET_STATION", { stationId })
  2151. .then((station) => next(null, station))
  2152. .catch(next);
  2153. },
  2154. (station, next) => {
  2155. if (!station) return next("Station not found.");
  2156. stations
  2157. .runJob("CAN_USER_VIEW_STATION", {
  2158. station,
  2159. userId: session.userId,
  2160. })
  2161. .then((canView) => {
  2162. if (canView) return next();
  2163. return next("Insufficient permissions.");
  2164. })
  2165. .catch((err) => {
  2166. return next(err);
  2167. });
  2168. },
  2169. (next) => {
  2170. userModel.updateOne(
  2171. { _id: session.userId },
  2172. { $addToSet: { favoriteStations: stationId } },
  2173. next
  2174. );
  2175. },
  2176. (res, next) => {
  2177. if (res.nModified === 0)
  2178. return next("The station was already favorited.");
  2179. next();
  2180. },
  2181. ],
  2182. async (err) => {
  2183. if (err) {
  2184. err = await utils.runJob("GET_ERROR", { error: err });
  2185. console.log(
  2186. "ERROR",
  2187. "FAVORITE_STATION",
  2188. `Favoriting station "${stationId}" failed. "${err}"`
  2189. );
  2190. return cb({ status: "failure", message: err });
  2191. }
  2192. console.log(
  2193. "SUCCESS",
  2194. "FAVORITE_STATION",
  2195. `Favorited station "${stationId}" successfully.`
  2196. );
  2197. cache.runJob("PUB", {
  2198. channel: "user.favoritedStation",
  2199. value: {
  2200. userId: session.userId,
  2201. stationId,
  2202. },
  2203. });
  2204. return cb({
  2205. status: "success",
  2206. message: "Succesfully favorited station.",
  2207. });
  2208. }
  2209. );
  2210. }),
  2211. unfavoriteStation: hooks.loginRequired(async (session, stationId, cb) => {
  2212. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  2213. async.waterfall(
  2214. [
  2215. (next) => {
  2216. userModel.updateOne(
  2217. { _id: session.userId },
  2218. { $pull: { favoriteStations: stationId } },
  2219. next
  2220. );
  2221. },
  2222. (res, next) => {
  2223. if (res.nModified === 0)
  2224. return next("The station wasn't favorited.");
  2225. next();
  2226. },
  2227. ],
  2228. async (err) => {
  2229. if (err) {
  2230. err = await utils.runJob("GET_ERROR", { error: err });
  2231. console.log(
  2232. "ERROR",
  2233. "UNFAVORITE_STATION",
  2234. `Unfavoriting station "${stationId}" failed. "${err}"`
  2235. );
  2236. return cb({ status: "failure", message: err });
  2237. }
  2238. console.log(
  2239. "SUCCESS",
  2240. "UNFAVORITE_STATION",
  2241. `Unfavorited station "${stationId}" successfully.`
  2242. );
  2243. cache.runJob("PUB", {
  2244. channel: "user.unfavoritedStation",
  2245. value: {
  2246. userId: session.userId,
  2247. stationId,
  2248. },
  2249. });
  2250. return cb({
  2251. status: "success",
  2252. message: "Succesfully unfavorited station.",
  2253. });
  2254. }
  2255. );
  2256. }),
  2257. };