stations.js 70 KB


  1. import async from "async";
  2. import mongoose from "mongoose";
  3. import config from "config";
  4. import { hasPermission, useHasPermission } from "../hooks/hasPermission";
  5. import isLoginRequired from "../hooks/loginRequired";
  6. // eslint-disable-next-line
  7. import moduleManager from "../../index";
  8. const DBModule = moduleManager.modules.db;
  9. const UtilsModule = moduleManager.modules.utils;
  10. const WSModule = moduleManager.modules.ws;
  11. const PlaylistsModule = moduleManager.modules.playlists;
  12. const CacheModule = moduleManager.modules.cache;
  13. const NotificationsModule = moduleManager.modules.notifications;
  14. const StationsModule = moduleManager.modules.stations;
  15. const ActivitiesModule = moduleManager.modules.activities;
  16. CacheModule.runJob("SUB", {
  17. channel: "station.updateUsers",
  18. cb: ({ stationId, usersPerStation }) => {
  19. WSModule.runJob("EMIT_TO_ROOM", {
  20. room: `station.${stationId}`,
  21. args: ["event:station.users.updated", { data: { users: usersPerStation } }]
  22. });
  23. }
  24. });
  25. CacheModule.runJob("SUB", {
  26. channel: "station.updateUserCount",
  27. cb: ({ stationId, usersPerStationCount }) => {
  28. const count = usersPerStationCount || 0;
  29. WSModule.runJob("EMIT_TO_ROOM", {
  30. room: `station.${stationId}`,
  31. args: ["event:station.userCount.updated", { data: { userCount: count } }]
  32. });
  33. StationsModule.runJob("GET_STATION", { stationId }).then(async station => {
  34. if (station.privacy === "public")
  35. WSModule.runJob("EMIT_TO_ROOM", {
  36. room: "home",
  37. args: ["event:station.userCount.updated", { data: { stationId, userCount: count } }]
  38. });
  39. else {
  40. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  41. room: "home"
  42. });
  43. sockets.forEach(async socketId => {
  44. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  45. if (!socket) return;
  46. const { session } = socket;
  47. hasPermission("stations.view", session, stationId)
  48. .then(() => {
  49. socket.dispatch("event:station.userCount.updated", {
  50. data: { stationId, count }
  51. });
  52. })
  53. .catch(() => {});
  54. });
  55. }
  56. });
  57. }
  58. });
  59. CacheModule.runJob("SUB", {
  60. channel: "station.autofillPlaylist",
  61. cb: data => {
  62. const { stationId, playlistId } = data;
  63. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }).then(playlist =>
  64. WSModule.runJob("EMIT_TO_ROOMS", {
  65. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  66. args: ["event:station.autofillPlaylist", { data: { stationId, playlist } }]
  67. })
  68. );
  69. }
  70. });
  71. CacheModule.runJob("SUB", {
  72. channel: "station.blacklistedPlaylist",
  73. cb: data => {
  74. const { stationId, playlistId } = data;
  75. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }).then(playlist =>
  76. WSModule.runJob("EMIT_TO_ROOMS", {
  77. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  78. args: ["event:station.blacklistedPlaylist", { data: { stationId, playlist } }]
  79. })
  80. );
  81. }
  82. });
  83. CacheModule.runJob("SUB", {
  84. channel: "station.removedAutofillPlaylist",
  85. cb: data => {
  86. const { stationId, playlistId } = data;
  87. WSModule.runJob("EMIT_TO_ROOMS", {
  88. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  89. args: ["event:station.removedAutofillPlaylist", { data: { stationId, playlistId } }]
  90. });
  91. }
  92. });
  93. CacheModule.runJob("SUB", {
  94. channel: "station.removedBlacklistedPlaylist",
  95. cb: data => {
  96. const { stationId, playlistId } = data;
  97. WSModule.runJob("EMIT_TO_ROOMS", {
  98. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  99. args: ["event:station.removedBlacklistedPlaylist", { data: { stationId, playlistId } }]
  100. });
  101. }
  102. });
  103. CacheModule.runJob("SUB", {
  104. channel: "station.pause",
  105. cb: stationId => {
  106. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  107. WSModule.runJob("EMIT_TO_ROOM", {
  108. room: `station.${stationId}`,
  109. args: ["event:station.pause", { data: { pausedAt: station.pausedAt } }]
  110. });
  111. WSModule.runJob("EMIT_TO_ROOM", {
  112. room: `manage-station.${stationId}`,
  113. args: ["event:station.pause", { data: { stationId, pausedAt: station.pausedAt } }]
  114. });
  115. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  116. room: `home`,
  117. station
  118. }).then(response => {
  119. const { socketsThatCan } = response;
  120. socketsThatCan.forEach(socket => {
  121. socket.dispatch("event:station.pause", { data: { stationId } });
  122. });
  123. });
  124. });
  125. }
  126. });
  127. CacheModule.runJob("SUB", {
  128. channel: "station.resume",
  129. cb: stationId => {
  130. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  131. WSModule.runJob("EMIT_TO_ROOM", {
  132. room: `station.${stationId}`,
  133. args: ["event:station.resume", { data: { timePaused: station.timePaused } }]
  134. });
  135. WSModule.runJob("EMIT_TO_ROOM", {
  136. room: `manage-station.${stationId}`,
  137. args: ["event:station.resume", { data: { stationId, timePaused: station.timePaused } }]
  138. });
  139. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  140. room: `home`,
  141. station
  142. })
  143. .then(response => {
  144. const { socketsThatCan } = response;
  145. socketsThatCan.forEach(socket => {
  146. socket.dispatch("event:station.resume", { data: { stationId } });
  147. });
  148. })
  149. .catch(console.log);
  150. });
  151. }
  152. });
  153. CacheModule.runJob("SUB", {
  154. channel: "station.queueUpdate",
  155. cb: stationId => {
  156. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  157. if (!station.currentSong && station.queue.length > 0) {
  158. StationsModule.runJob("INITIALIZE_STATION", {
  159. stationId
  160. }).then();
  161. }
  162. WSModule.runJob("EMIT_TO_ROOM", {
  163. room: `station.${stationId}`,
  164. args: ["event:station.queue.updated", { data: { queue: station.queue } }]
  165. });
  166. WSModule.runJob("EMIT_TO_ROOM", {
  167. room: `manage-station.${stationId}`,
  168. args: ["event:manageStation.queue.updated", { data: { stationId, queue: station.queue } }]
  169. });
  170. });
  171. }
  172. });
  173. CacheModule.runJob("SUB", {
  174. channel: "station.repositionSongInQueue",
  175. cb: res => {
  176. WSModule.runJob("EMIT_TO_ROOM", {
  177. room: `station.${res.stationId}`,
  178. args: ["event:station.queue.song.repositioned", { data: { song: res.song } }]
  179. });
  180. WSModule.runJob("EMIT_TO_ROOM", {
  181. room: `manage-station.${res.stationId}`,
  182. args: [
  183. "event:manageStation.queue.song.repositioned",
  184. { data: { stationId: res.stationId, song: res.song } }
  185. ]
  186. });
  187. }
  188. });
  189. CacheModule.runJob("SUB", {
  190. channel: "station.toggleSkipVote",
  191. cb: res => {
  192. const { stationId, voted, userId } = res;
  193. WSModule.runJob("EMIT_TO_ROOM", {
  194. room: `station.${stationId}`,
  195. args: ["event:station.toggleSkipVote", { data: { voted, userId } }]
  196. });
  197. }
  198. });
  199. CacheModule.runJob("SUB", {
  200. channel: "station.remove",
  201. cb: stationId => {
  202. WSModule.runJob("EMIT_TO_ROOM", {
  203. room: `station.${stationId}`,
  204. args: ["event:station.deleted"]
  205. });
  206. WSModule.runJob("EMIT_TO_ROOM", {
  207. room: `manage-station.${stationId}`,
  208. args: ["event:station.deleted"]
  209. });
  210. WSModule.runJob("EMIT_TO_ROOM", {
  211. room: `home`,
  212. args: ["event:station.deleted", { data: { stationId } }]
  213. });
  214. WSModule.runJob("EMIT_TO_ROOM", {
  215. room: "admin.stations",
  216. args: ["event:admin.station.deleted", { data: { stationId } }]
  217. });
  218. }
  219. });
  220. CacheModule.runJob("SUB", {
  221. channel: "station.create",
  222. cb: async stationId => {
  223. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then(async res => {
  224. const { station } = res;
  225. station.userCount = StationsModule.usersPerStationCount[stationId] || 0;
  226. WSModule.runJob("EMIT_TO_ROOM", {
  227. room: "admin.stations",
  228. args: ["event:admin.station.created", { data: { station } }]
  229. });
  230. if (station.privacy === "public")
  231. WSModule.runJob("EMIT_TO_ROOM", {
  232. room: "home",
  233. args: ["event:station.created", { data: { station } }]
  234. });
  235. else {
  236. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  237. room: "home"
  238. });
  239. sockets.forEach(async socketId => {
  240. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  241. if (!socket) return;
  242. const { session } = socket;
  243. hasPermission("stations.view", session, stationId)
  244. .then(() => {
  245. socket.dispatch("event:station.created", { data: { station } });
  246. })
  247. .catch(() => {});
  248. });
  249. }
  250. });
  251. }
  252. });
  253. CacheModule.runJob("SUB", {
  254. channel: "station.updated",
  255. cb: async data => {
  256. const stationModel = await DBModule.runJob("GET_MODEL", {
  257. modelName: "station"
  258. });
  259. const { stationId } = data;
  260. stationModel.findOne(
  261. { _id: stationId },
  262. ["_id", "name", "displayName", "description", "type", "privacy", "owner", "requests", "autofill", "theme"],
  263. (err, station) => {
  264. WSModule.runJob("EMIT_TO_ROOMS", {
  265. rooms: [`station.${stationId}`, `manage-station.${stationId}`, "admin.stations"],
  266. args: ["event:station.updated", { data: { station } }]
  267. });
  268. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  269. room: `home`,
  270. station
  271. }).then(response => {
  272. const { socketsThatCan } = response;
  273. socketsThatCan.forEach(socket => {
  274. socket.dispatch("event:station.updated", { data: { station } });
  275. });
  276. });
  277. if (data.previousStation && data.previousStation.privacy !== station.privacy) {
  278. if (station.privacy === "public") {
  279. // Station became public
  280. WSModule.runJob("EMIT_TO_ROOM", {
  281. room: "home",
  282. args: ["event:station.created", { data: { station } }]
  283. });
  284. } else if (data.previousStation.privacy === "public") {
  285. // Station became hidden
  286. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  287. room: `home`,
  288. station
  289. }).then(response => {
  290. const { socketsThatCannot } = response;
  291. socketsThatCannot.forEach(socket => {
  292. socket.dispatch("event:station.deleted", { data: { stationId } });
  293. });
  294. });
  295. }
  296. }
  297. }
  298. );
  299. }
  300. });
  301. export default {
  302. /**
  303. * Get a list of all the stations
  304. *
  305. * @param {object} session - user session
  306. * @param {boolean} adminFilter - whether to filter out stations admins do not own
  307. * @param {Function} cb - callback
  308. */
  309. async index(session, adminFilter, cb) {
  310. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  311. async.waterfall(
  312. [
  313. // get array of the ids of the user's favorite stations
  314. next => {
  315. if (session.userId)
  316. return userModel.findById(session.userId).select({ favoriteStations: -1 }).exec(next);
  317. return next(null, { favoriteStations: [] });
  318. },
  319. (user, next) => {
  320. const favoriteStations = user ? user.favoriteStations : [];
  321. CacheModule.runJob("HGETALL", { table: "stations" }, this).then(stations =>
  322. next(null, stations, favoriteStations)
  323. );
  324. },
  325. (stations, favorited, next) => {
  326. const filteredStations = [];
  327. async.eachLimit(
  328. stations,
  329. 1,
  330. (station, nextStation) => {
  331. async.waterfall(
  332. [
  333. callback => {
  334. // only relevant if user logged in
  335. if (session.userId) {
  336. if (favorited.indexOf(station._id) !== -1) station.isFavorited = true;
  337. return callback();
  338. }
  339. return callback();
  340. },
  341. callback => {
  342. StationsModule.runJob(
  343. "CAN_USER_VIEW_STATION",
  344. {
  345. station,
  346. userId: session.userId
  347. },
  348. this
  349. )
  350. .then(exists => callback(null, exists))
  351. .catch(callback);
  352. },
  353. (exists, callback) => {
  354. if (!exists) callback(null, false, false);
  355. else if (station.privacy === "public") callback(null, true, true);
  356. else
  357. hasPermission("stations.index", session.userId, station._id)
  358. .then(() => callback(null, true, true))
  359. .catch(() => callback(null, true, false));
  360. },
  361. (exists, canIndex, callback) => {
  362. if (!exists) callback(null, false);
  363. else if (!canIndex && !adminFilter)
  364. hasPermission("stations.index.other", session.userId)
  365. .then(() => callback(null, true))
  366. .catch(() => callback(null, false));
  367. else callback(null, canIndex);
  368. }
  369. ],
  370. (err, exists) => {
  371. if (err) return this.log("ERROR", "STATIONS_INDEX", err);
  372. station.userCount = StationsModule.usersPerStationCount[station._id] || 0;
  373. if (exists) filteredStations.push(station);
  374. return nextStation();
  375. }
  376. );
  377. },
  378. () => next(null, filteredStations, favorited)
  379. );
  380. }
  381. ],
  382. async (err, stations, favorited) => {
  383. if (err) {
  384. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  385. this.log("ERROR", "STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  386. return cb({ status: "error", message: err });
  387. }
  388. this.log("SUCCESS", "STATIONS_INDEX", `Indexing stations successful.`, false);
  389. return cb({ status: "success", data: { stations, favorited } });
  390. }
  391. );
  392. },
  393. /**
  394. * Gets stations, used in the admin stations page by the AdvancedTable component
  395. *
  396. * @param {object} session - the session object automatically added by the websocket
  397. * @param page - the page
  398. * @param pageSize - the size per page
  399. * @param properties - the properties to return for each station
  400. * @param sort - the sort object
  401. * @param queries - the queries array
  402. * @param operator - the operator for queries
  403. * @param cb
  404. */
  405. getData: useHasPermission(
  406. "admin.view.stations",
  407. async function getSet(session, page, pageSize, properties, sort, queries, operator, cb) {
  408. async.waterfall(
  409. [
  410. next => {
  411. DBModule.runJob(
  412. "GET_DATA",
  413. {
  414. page,
  415. pageSize,
  416. properties,
  417. sort,
  418. queries,
  419. operator,
  420. modelName: "station",
  421. blacklistedProperties: [],
  422. specialProperties: {
  423. owner: [
  424. {
  425. $addFields: {
  426. ownerOID: {
  427. $convert: {
  428. input: "$owner",
  429. to: "objectId",
  430. onError: "unknown",
  431. onNull: "unknown"
  432. }
  433. }
  434. }
  435. },
  436. {
  437. $lookup: {
  438. from: "users",
  439. localField: "ownerOID",
  440. foreignField: "_id",
  441. as: "ownerUser"
  442. }
  443. },
  444. {
  445. $unwind: {
  446. path: "$ownerUser",
  447. preserveNullAndEmptyArrays: true
  448. }
  449. },
  450. {
  451. $addFields: {
  452. ownerUsername: {
  453. $cond: [
  454. { $eq: [{ $type: "$owner" }, "string"] },
  455. { $ifNull: ["$ownerUser.username", "unknown"] },
  456. "none"
  457. ]
  458. }
  459. }
  460. },
  461. {
  462. $project: {
  463. ownerOID: 0,
  464. ownerUser: 0
  465. }
  466. }
  467. ]
  468. },
  469. specialQueries: {
  470. owner: newQuery => ({ $or: [newQuery, { ownerUsername: newQuery.owner }] })
  471. }
  472. },
  473. this
  474. )
  475. .then(response => {
  476. next(null, response);
  477. })
  478. .catch(err => {
  479. next(err);
  480. });
  481. }
  482. ],
  483. async (err, response) => {
  484. if (err) {
  485. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  486. this.log("ERROR", "STATIONS_GET_DATA", `Failed to get data from stations. "${err}"`);
  487. return cb({ status: "error", message: err });
  488. }
  489. this.log("SUCCESS", "STATIONS_GET_DATA", `Got data from stations successfully.`);
  490. return cb({ status: "success", message: "Successfully got data from stations.", data: response });
  491. }
  492. );
  493. }
  494. ),
  495. /**
  496. * Obtains basic metadata of a station in order to format an activity
  497. *
  498. * @param {object} session - user session
  499. * @param {string} stationId - the station id
  500. * @param {Function} cb - callback
  501. */
  502. getStationForActivity(session, stationId, cb) {
  503. async.waterfall(
  504. [
  505. next => {
  506. StationsModule.runJob("GET_STATION", { stationId }, this)
  507. .then(station => {
  508. next(null, station);
  509. })
  510. .catch(next);
  511. }
  512. ],
  513. async (err, station) => {
  514. if (err) {
  515. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  516. this.log(
  517. "ERROR",
  518. "STATIONS_GET_STATION_FOR_ACTIVITY",
  519. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  520. );
  521. return cb({ status: "error", message: err });
  522. }
  523. this.log(
  524. "SUCCESS",
  525. "STATIONS_GET_STATION_FOR_ACTIVITY",
  526. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  527. );
  528. return cb({
  529. status: "success",
  530. data: {
  531. title: station.displayName,
  532. thumbnail: station.currentSong ? station.currentSong.thumbnail : ""
  533. }
  534. });
  535. }
  536. );
  537. },
  538. /**
  539. * Verifies that a station exists from its name
  540. *
  541. * @param {object} session - user session
  542. * @param {string} stationName - the station name
  543. * @param {Function} cb - callback
  544. */
  545. existsByName(session, stationName, cb) {
  546. async.waterfall(
  547. [
  548. next => {
  549. StationsModule.runJob("GET_STATION_BY_NAME", { stationName }, this)
  550. .then(station => next(null, station))
  551. .catch(next);
  552. },
  553. (station, next) => {
  554. if (!station) return next(null, false);
  555. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  556. .then(exists => next(null, exists))
  557. .catch(next);
  558. }
  559. ],
  560. async (err, exists) => {
  561. if (err) {
  562. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  563. this.log(
  564. "ERROR",
  565. "STATION_EXISTS_BY_NAME",
  566. `Checking if station "${stationName}" exists failed. "${err}"`
  567. );
  568. return cb({ status: "error", message: err });
  569. }
  570. this.log(
  571. "SUCCESS",
  572. "STATION_EXISTS_BY_NAME",
  573. `Station "${stationName}" exists successfully.` /* , false */
  574. );
  575. return cb({ status: "success", data: { exists } });
  576. }
  577. );
  578. },
  579. /**
  580. * Verifies that a station exists from its id
  581. *
  582. * @param {object} session - user session
  583. * @param {string} stationId - the station id
  584. * @param {Function} cb - callback
  585. */
  586. existsById(session, stationId, cb) {
  587. async.waterfall(
  588. [
  589. next => {
  590. StationsModule.runJob("GET_STATION", { stationId }, this)
  591. .then(station => next(null, station))
  592. .catch(next);
  593. },
  594. (station, next) => {
  595. if (!station) return next(null, false);
  596. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  597. .then(exists => next(null, exists))
  598. .catch(next);
  599. }
  600. ],
  601. async (err, exists) => {
  602. if (err) {
  603. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  604. this.log(
  605. "ERROR",
  606. "STATION_EXISTS_BY_ID",
  607. `Checking if station "${stationId}" exists failed. "${err}"`
  608. );
  609. return cb({ status: "error", message: err });
  610. }
  611. this.log(
  612. "SUCCESS",
  613. "STATION_EXISTS_BY_ID",
  614. `Station "${stationId}" exists successfully.` /* , false */
  615. );
  616. return cb({ status: "success", data: { exists } });
  617. }
  618. );
  619. },
  620. /**
  621. * Gets the official playlist for a station
  622. *
  623. * @param {object} session - user session
  624. * @param {string} stationId - the station id
  625. * @param {Function} cb - callback
  626. */
  627. getPlaylist(session, stationId, cb) {
  628. async.waterfall(
  629. [
  630. next => {
  631. StationsModule.runJob("GET_STATION", { stationId }, this)
  632. .then(station => {
  633. next(null, station);
  634. })
  635. .catch(next);
  636. },
  637. (station, next) => {
  638. StationsModule.runJob(
  639. "CAN_USER_VIEW_STATION",
  640. {
  641. station,
  642. userId: session.userId
  643. },
  644. this
  645. )
  646. .then(canView => {
  647. if (canView) return next(null, station);
  648. return next("Insufficient permissions.");
  649. })
  650. .catch(err => next(err));
  651. },
  652. (station, next) => {
  653. if (!station) return next("Station not found.");
  654. if (station.type !== "official") return next("This is not an official station.");
  655. return next();
  656. },
  657. next => {
  658. CacheModule.runJob(
  659. "HGET",
  660. {
  661. table: "officialPlaylists",
  662. key: stationId
  663. },
  664. this
  665. )
  666. .then(playlist => {
  667. next(null, playlist);
  668. })
  669. .catch(next);
  670. },
  671. (playlist, next) => {
  672. if (!playlist) return next("Playlist not found.");
  673. return next(null, playlist);
  674. }
  675. ],
  676. async (err, playlist) => {
  677. if (err) {
  678. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  679. this.log(
  680. "ERROR",
  681. "STATIONS_GET_PLAYLIST",
  682. `Getting playlist for station "${stationId}" failed. "${err}"`
  683. );
  684. return cb({ status: "error", message: err });
  685. }
  686. this.log(
  687. "SUCCESS",
  688. "STATIONS_GET_PLAYLIST",
  689. `Got playlist for station "${stationId}" successfully.`,
  690. false
  691. );
  692. return cb({ status: "success", data: { songs: playlist.songs } });
  693. }
  694. );
  695. },
  696. /**
  697. * Joins the station by its name
  698. *
  699. * @param {object} session - user session
  700. * @param {string} stationIdentifier - the station name or station id
  701. * @param {Function} cb - callback
  702. */
  703. async join(session, stationIdentifier, cb) {
  704. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  705. async.waterfall(
  706. [
  707. next => {
  708. StationsModule.runJob("GET_STATION_BY_NAME", { stationName: stationIdentifier }, this)
  709. .then(station => next(null, station))
  710. .catch(() =>
  711. // station identifier may be using stationid instead
  712. StationsModule.runJob("GET_STATION", { stationId: stationIdentifier }, this)
  713. .then(station => next(null, station))
  714. .catch(next)
  715. );
  716. },
  717. (station, next) => {
  718. if (!station) return next("Station not found.");
  719. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  720. .then(canView => {
  721. if (!canView) next("Not allowed to join station.");
  722. else next(null, station);
  723. })
  724. .catch(err => next(err));
  725. },
  726. (station, next) => {
  727. WSModule.runJob("SOCKET_JOIN_ROOM", {
  728. socketId: session.socketId,
  729. room: `station.${station._id}`
  730. });
  731. const data = {
  732. _id: station._id,
  733. type: station.type,
  734. currentSong: station.currentSong,
  735. startedAt: station.startedAt,
  736. paused: station.paused,
  737. timePaused: station.timePaused,
  738. pausedAt: station.pausedAt,
  739. description: station.description,
  740. displayName: station.displayName,
  741. name: station.name,
  742. privacy: station.privacy,
  743. requests: station.requests,
  744. autofill: station.autofill,
  745. owner: station.owner,
  746. blacklist: station.blacklist,
  747. theme: station.theme,
  748. djs: station.djs
  749. };
  750. StationsModule.userList[session.socketId] = station._id;
  751. next(null, data);
  752. },
  753. (data, next) => {
  754. userModel.find({ _id: { $in: data.djs } }, (err, users) => {
  755. if (err) next(err);
  756. else {
  757. data.djs = users.map(user => {
  758. const { _id, name, username, avatar } = user._doc;
  759. return { _id, name, username, avatar };
  760. });
  761. next(null, data);
  762. }
  763. });
  764. },
  765. (data, next) => {
  766. data = JSON.parse(JSON.stringify(data));
  767. data.userCount = StationsModule.usersPerStationCount[data._id] || 0;
  768. data.users = StationsModule.usersPerStation[data._id] || [];
  769. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  770. WSModule.runJob("SOCKET_JOIN_SONG_ROOM", {
  771. socketId: session.socketId,
  772. room: `song.${data.currentSong.youtubeId}`
  773. });
  774. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  775. return next(null, data);
  776. },
  777. (data, next) => {
  778. // only relevant if user logged in
  779. if (session.userId) {
  780. return StationsModule.runJob(
  781. "HAS_USER_FAVORITED_STATION",
  782. {
  783. userId: session.userId,
  784. stationId: data._id
  785. },
  786. this
  787. )
  788. .then(isStationFavorited => {
  789. data.isFavorited = isStationFavorited;
  790. return next(null, data);
  791. })
  792. .catch(err => next(err));
  793. }
  794. return next(null, data);
  795. }
  796. ],
  797. async (err, data) => {
  798. if (err) {
  799. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  800. this.log("ERROR", "STATIONS_JOIN", `Joining station "${stationIdentifier}" failed. "${err}"`);
  801. return cb({ status: "error", message: err });
  802. }
  803. this.log("SUCCESS", "STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  804. return cb({ status: "success", data });
  805. }
  806. );
  807. },
  808. /**
  809. * Gets a station by id
  810. *
  811. * @param {object} session - user session
  812. * @param {string} stationId - the station id
  813. * @param {Function} cb - callback
  814. */
  815. getStationById(session, stationId, cb) {
  816. async.waterfall(
  817. [
  818. next => {
  819. StationsModule.runJob("GET_STATION", { stationId }, this)
  820. .then(station => {
  821. next(null, station);
  822. })
  823. .catch(next);
  824. },
  825. (station, next) => {
  826. if (!station) return next("Station not found.");
  827. return StationsModule.runJob(
  828. "CAN_USER_VIEW_STATION",
  829. {
  830. station,
  831. userId: session.userId
  832. },
  833. this
  834. )
  835. .then(canView => {
  836. if (!canView) next("Not allowed to get station.");
  837. else next(null, station);
  838. })
  839. .catch(err => next(err));
  840. },
  841. (station, next) => {
  842. // only relevant if user logged in
  843. if (session.userId) {
  844. return StationsModule.runJob(
  845. "HAS_USER_FAVORITED_STATION",
  846. {
  847. userId: session.userId,
  848. stationId
  849. },
  850. this
  851. )
  852. .then(isStationFavorited => {
  853. station.isFavorited = isStationFavorited;
  854. return next(null, station);
  855. })
  856. .catch(err => next(err));
  857. }
  858. return next(null, station);
  859. },
  860. (station, next) => {
  861. const data = {
  862. _id: station._id,
  863. type: station.type,
  864. description: station.description,
  865. displayName: station.displayName,
  866. name: station.name,
  867. privacy: station.privacy,
  868. requests: station.requests,
  869. autofill: station.autofill,
  870. owner: station.owner,
  871. theme: station.theme,
  872. paused: station.paused,
  873. currentSong: station.currentSong,
  874. isFavorited: station.isFavorited,
  875. djs: station.djs
  876. };
  877. next(null, data);
  878. }
  879. ],
  880. async (err, data) => {
  881. if (err) {
  882. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  883. this.log("ERROR", "GET_STATION_BY_ID", `Getting station "${stationId}" failed. "${err}"`);
  884. return cb({ status: "error", message: err });
  885. }
  886. this.log("SUCCESS", "GET_STATION_BY_ID", `Got station "${stationId}" successfully.`);
  887. return cb({ status: "success", data: { station: data } });
  888. }
  889. );
  890. },
  891. getStationAutofillPlaylistsById(session, stationId, cb) {
  892. async.waterfall(
  893. [
  894. next => {
  895. StationsModule.runJob("GET_STATION", { stationId }, this)
  896. .then(station => {
  897. next(null, station);
  898. })
  899. .catch(next);
  900. },
  901. (station, next) => {
  902. if (!station) return next("Station not found.");
  903. return StationsModule.runJob(
  904. "CAN_USER_VIEW_STATION",
  905. {
  906. station,
  907. userId: session.userId
  908. },
  909. this
  910. )
  911. .then(canView => {
  912. if (!canView) next("Not allowed to get station.");
  913. else next(null, station);
  914. })
  915. .catch(err => next(err));
  916. },
  917. (station, next) => {
  918. const playlists = [];
  919. async.eachLimit(
  920. station.autofill.playlists,
  921. 1,
  922. (playlistId, next) => {
  923. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  924. .then(playlist => {
  925. playlists.push(playlist);
  926. next();
  927. })
  928. .catch(() => {
  929. playlists.push(null);
  930. next();
  931. });
  932. },
  933. err => {
  934. next(err, playlists);
  935. }
  936. );
  937. }
  938. ],
  939. async (err, playlists) => {
  940. if (err) {
  941. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  942. this.log(
  943. "ERROR",
  944. "GET_STATION_AUTOFILL_PLAYLISTS_BY_ID",
  945. `Getting station "${stationId}"'s autofilling playlists failed. "${err}"`
  946. );
  947. return cb({ status: "error", message: err });
  948. }
  949. this.log(
  950. "SUCCESS",
  951. "GET_STATION_AUTOFILL_PLAYLISTS_BY_ID",
  952. `Got station "${stationId}"'s autofilling playlists successfully.`
  953. );
  954. return cb({ status: "success", data: { playlists } });
  955. }
  956. );
  957. },
  958. getStationBlacklistById(session, stationId, cb) {
  959. async.waterfall(
  960. [
  961. next => {
  962. StationsModule.runJob("GET_STATION", { stationId }, this)
  963. .then(station => {
  964. next(null, station);
  965. })
  966. .catch(next);
  967. },
  968. (station, next) => {
  969. if (!station) return next("Station not found.");
  970. return StationsModule.runJob(
  971. "CAN_USER_VIEW_STATION",
  972. {
  973. station,
  974. userId: session.userId
  975. },
  976. this
  977. )
  978. .then(canView => {
  979. if (!canView) next("Not allowed to get station.");
  980. else next(null, station);
  981. })
  982. .catch(err => next(err));
  983. },
  984. (station, next) => {
  985. const playlists = [];
  986. async.eachLimit(
  987. station.blacklist,
  988. 1,
  989. (playlistId, next) => {
  990. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  991. .then(playlist => {
  992. playlists.push(playlist);
  993. next();
  994. })
  995. .catch(() => {
  996. playlists.push(null);
  997. next();
  998. });
  999. },
  1000. err => {
  1001. next(err, playlists);
  1002. }
  1003. );
  1004. }
  1005. ],
  1006. async (err, playlists) => {
  1007. if (err) {
  1008. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1009. this.log(
  1010. "ERROR",
  1011. "GET_STATION_BLACKLIST_BY_ID",
  1012. `Getting station "${stationId}"'s blacklist failed. "${err}"`
  1013. );
  1014. return cb({ status: "error", message: err });
  1015. }
  1016. this.log(
  1017. "SUCCESS",
  1018. "GET_STATION_BLACKLIST_BY_ID",
  1019. `Got station "${stationId}"'s blacklist successfully.`
  1020. );
  1021. return cb({ status: "success", data: { playlists } });
  1022. }
  1023. );
  1024. },
  1025. /**
  1026. * Toggle votes to skip a station
  1027. *
  1028. * @param session
  1029. * @param stationId - the station id
  1030. * @param cb
  1031. */
  1032. toggleSkipVote: isLoginRequired(async function toggleSkipVote(session, stationId, cb) {
  1033. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1034. async.waterfall(
  1035. [
  1036. next => {
  1037. StationsModule.runJob("GET_STATION", { stationId }, this)
  1038. .then(station => next(null, station))
  1039. .catch(next);
  1040. },
  1041. (station, next) => {
  1042. if (!station) return next("Station not found.");
  1043. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  1044. .then(canView => {
  1045. if (canView) return next(null, station);
  1046. return next("Insufficient permissions.");
  1047. })
  1048. .catch(err => next(err));
  1049. },
  1050. (station, next) => {
  1051. if (!station.currentSong) return next("There is currently no song to skip.");
  1052. const query = {};
  1053. const voted = station.currentSong.skipVotes.indexOf(session.userId) !== -1;
  1054. if (!voted) query.$push = { "currentSong.skipVotes": session.userId };
  1055. else query.$pull = { "currentSong.skipVotes": session.userId };
  1056. return stationModel.updateOne({ _id: stationId }, query, err => {
  1057. if (err) next(err);
  1058. else next(null, !voted);
  1059. });
  1060. },
  1061. (voted, next) => {
  1062. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1063. .then(station => {
  1064. next(null, station, voted);
  1065. })
  1066. .catch(next);
  1067. },
  1068. (station, voted, next) => {
  1069. if (!station) return next("Station not found.");
  1070. return StationsModule.runJob("PROCESS_SKIP_VOTES", { stationId }, this)
  1071. .then(() => next(null, voted))
  1072. .catch(next);
  1073. }
  1074. ],
  1075. async (err, voted) => {
  1076. if (err) {
  1077. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1078. this.log(
  1079. "ERROR",
  1080. "STATIONS_TOGGLE_SKIP_VOTE",
  1081. `Toggling skip vote on "${stationId}" failed. "${err}"`
  1082. );
  1083. return cb({ status: "error", message: err });
  1084. }
  1085. this.log("SUCCESS", "STATIONS_TOGGLE_SKIP_VOTE", `Toggling skip vote on "${stationId}" successful.`);
  1086. CacheModule.runJob("PUB", {
  1087. channel: "station.toggleSkipVote",
  1088. value: { stationId, voted, userId: session.userId }
  1089. });
  1090. return cb({
  1091. status: "success",
  1092. message: voted
  1093. ? "Successfully voted to skip the song."
  1094. : "Successfully removed vote to skip the song.",
  1095. data: { voted }
  1096. });
  1097. }
  1098. );
  1099. }),
  1100. /**
  1101. * Force skips a station
  1102. *
  1103. * @param {object} session - user session
  1104. * @param {string} stationId - the station id
  1105. * @param {Function} cb - callback
  1106. */
  1107. forceSkip(session, stationId, cb) {
  1108. async.waterfall(
  1109. [
  1110. next => {
  1111. hasPermission("stations.skip", session, stationId)
  1112. .then(() => next())
  1113. .catch(next);
  1114. },
  1115. next => {
  1116. StationsModule.runJob("GET_STATION", { stationId }, this)
  1117. .then(station => {
  1118. next(null, station);
  1119. })
  1120. .catch(next);
  1121. },
  1122. (station, next) => {
  1123. if (!station) return next("Station not found.");
  1124. return next();
  1125. }
  1126. ],
  1127. async err => {
  1128. if (err) {
  1129. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1130. this.log("ERROR", "STATIONS_FORCE_SKIP", `Force skipping station "${stationId}" failed. "${err}"`);
  1131. return cb({ status: "error", message: err });
  1132. }
  1133. StationsModule.runJob("SKIP_STATION", { stationId, natural: false });
  1134. this.log("SUCCESS", "STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  1135. return cb({
  1136. status: "success",
  1137. message: "Successfully skipped station."
  1138. });
  1139. }
  1140. );
  1141. },
  1142. /**
  1143. * Leaves the user's current station
  1144. *
  1145. * @param {object} session - user session
  1146. * @param {string} stationId - id of station to leave
  1147. * @param {Function} cb - callback
  1148. */
  1149. leave(session, stationId, cb) {
  1150. async.waterfall(
  1151. [
  1152. next => {
  1153. StationsModule.runJob("GET_STATION", { stationId }, this)
  1154. .then(station => next(null, station))
  1155. .catch(next);
  1156. },
  1157. (station, next) => {
  1158. if (!station) return next("Station not found.");
  1159. return next();
  1160. }
  1161. ],
  1162. async (err, userCount) => {
  1163. if (err) {
  1164. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1165. this.log("ERROR", "STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  1166. return cb({ status: "error", message: err });
  1167. }
  1168. this.log("SUCCESS", "STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  1169. WSModule.runJob("SOCKET_LEAVE_ROOM", { socketId: session.socketId, room: `station.${stationId}` });
  1170. WSModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", { sockets: [session.socketId] });
  1171. delete StationsModule.userList[session.socketId];
  1172. return cb({
  1173. status: "success",
  1174. message: "Successfully left station.",
  1175. data: { userCount }
  1176. });
  1177. }
  1178. );
  1179. },
  1180. /**
  1181. * Updates a station's settings
  1182. *
  1183. * @param {object} session - user session
  1184. * @param {string} stationId - the station id
  1185. * @param {object} newStation - updated station object
  1186. * @param {Function} cb - callback
  1187. */
  1188. async update(session, stationId, newStation, cb) {
  1189. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1190. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  1191. async.waterfall(
  1192. [
  1193. next => {
  1194. hasPermission("stations.update", session, stationId)
  1195. .then(() => next())
  1196. .catch(next);
  1197. },
  1198. next => {
  1199. stationModel.findOne({ _id: stationId }, next);
  1200. },
  1201. (previousStation, next) => {
  1202. const { name, displayName, description, privacy, requests, autofill, theme } = newStation;
  1203. const { enabled, limit, mode } = autofill;
  1204. // This object makes sure only certain properties can be changed by a user
  1205. const setObject = {
  1206. name,
  1207. displayName,
  1208. description,
  1209. privacy,
  1210. requests,
  1211. "autofill.enabled": enabled,
  1212. "autofill.limit": limit,
  1213. "autofill.mode": mode,
  1214. theme
  1215. };
  1216. stationModel.updateOne({ _id: stationId }, { $set: setObject }, { runValidators: true }, err => {
  1217. next(err, previousStation);
  1218. });
  1219. },
  1220. (previousStation, next) => {
  1221. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1222. .then(station => next(null, station, previousStation))
  1223. .catch(next);
  1224. },
  1225. (station, previousStation, next) => {
  1226. if (
  1227. newStation.autofill.enabled &&
  1228. JSON.stringify(newStation.autofill) !== JSON.stringify(previousStation.autofill)
  1229. )
  1230. StationsModule.runJob("AUTOFILL_STATION", { stationId }, this)
  1231. .then(() => {
  1232. CacheModule.runJob("PUB", {
  1233. channel: "station.queueUpdate",
  1234. value: stationId
  1235. })
  1236. .then(() => next(null, station, previousStation))
  1237. .catch(next);
  1238. })
  1239. .catch(err => {
  1240. if (err === "Autofill is disabled in this station" || err === "Autofill limit reached")
  1241. next(null, station, previousStation);
  1242. else next(err);
  1243. });
  1244. else next(null, station, previousStation);
  1245. },
  1246. (station, previousStation, next) => {
  1247. playlistModel.updateOne(
  1248. { _id: station.playlist },
  1249. { $set: { displayName: `Station - ${station.displayName}` } },
  1250. err => {
  1251. next(err, station, previousStation);
  1252. }
  1253. );
  1254. }
  1255. ],
  1256. async (err, station, previousStation) => {
  1257. if (err) {
  1258. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1259. this.log("ERROR", "STATIONS_UPDATE", `Updating station "${stationId}" failed. "${err}"`);
  1260. return cb({ status: "error", message: err });
  1261. }
  1262. this.log("SUCCESS", "STATIONS_UPDATE", `Updated station "${stationId}" successfully.`);
  1263. CacheModule.runJob("PUB", {
  1264. channel: "station.updated",
  1265. value: { stationId, previousStation }
  1266. });
  1267. return cb({
  1268. status: "success",
  1269. message: "Successfully updated the station."
  1270. });
  1271. }
  1272. );
  1273. },
  1274. /**
  1275. * Pauses a station
  1276. *
  1277. * @param {object} session - user session
  1278. * @param {string} stationId - the station id
  1279. * @param {Function} cb - callback
  1280. */
  1281. async pause(session, stationId, cb) {
  1282. const stationModel = await DBModule.runJob(
  1283. "GET_MODEL",
  1284. {
  1285. modelName: "station"
  1286. },
  1287. this
  1288. );
  1289. async.waterfall(
  1290. [
  1291. next => {
  1292. hasPermission("stations.playback.toggle", session, stationId)
  1293. .then(() => next())
  1294. .catch(next);
  1295. },
  1296. next => {
  1297. StationsModule.runJob("GET_STATION", { stationId }, this)
  1298. .then(station => {
  1299. next(null, station);
  1300. })
  1301. .catch(next);
  1302. },
  1303. (station, next) => {
  1304. if (!station) return next("Station not found.");
  1305. if (station.paused) return next("That station was already paused.");
  1306. return stationModel.updateOne(
  1307. { _id: stationId },
  1308. { $set: { paused: true, pausedAt: Date.now() } },
  1309. next
  1310. );
  1311. },
  1312. (res, next) => {
  1313. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1314. .then(() => next())
  1315. .catch(next);
  1316. }
  1317. ],
  1318. async err => {
  1319. if (err) {
  1320. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1321. this.log("ERROR", "STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  1322. return cb({ status: "error", message: err });
  1323. }
  1324. this.log("SUCCESS", "STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  1325. CacheModule.runJob("PUB", {
  1326. channel: "station.pause",
  1327. value: stationId
  1328. });
  1329. NotificationsModule.runJob("UNSCHEDULE", {
  1330. name: `stations.nextSong?id=${stationId}`
  1331. });
  1332. return cb({
  1333. status: "success",
  1334. message: "Successfully paused."
  1335. });
  1336. }
  1337. );
  1338. },
  1339. /**
  1340. * Resumes a station
  1341. *
  1342. * @param {object} session - user session
  1343. * @param {string} stationId - the station id
  1344. * @param {Function} cb - callback
  1345. */
  1346. async resume(session, stationId, cb) {
  1347. const stationModel = await DBModule.runJob(
  1348. "GET_MODEL",
  1349. {
  1350. modelName: "station"
  1351. },
  1352. this
  1353. );
  1354. async.waterfall(
  1355. [
  1356. next => {
  1357. hasPermission("stations.playback.toggle", session, stationId)
  1358. .then(() => next())
  1359. .catch(next);
  1360. },
  1361. next => {
  1362. StationsModule.runJob("GET_STATION", { stationId }, this)
  1363. .then(station => {
  1364. next(null, station);
  1365. })
  1366. .catch(next);
  1367. },
  1368. (station, next) => {
  1369. if (!station) return next("Station not found.");
  1370. if (!station.paused) return next("That station is not paused.");
  1371. station.timePaused += Date.now() - station.pausedAt;
  1372. return stationModel.updateOne(
  1373. { _id: stationId },
  1374. {
  1375. $set: { paused: false },
  1376. $inc: { timePaused: Date.now() - station.pausedAt }
  1377. },
  1378. next
  1379. );
  1380. },
  1381. (res, next) => {
  1382. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1383. .then(() => next())
  1384. .catch(next);
  1385. },
  1386. next => {
  1387. StationsModule.runJob("PROCESS_SKIP_VOTES", { stationId }, this)
  1388. .then(() => next())
  1389. .catch(next);
  1390. }
  1391. ],
  1392. async err => {
  1393. if (err) {
  1394. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1395. this.log("ERROR", "STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  1396. return cb({ status: "error", message: err });
  1397. }
  1398. this.log("SUCCESS", "STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  1399. CacheModule.runJob("PUB", {
  1400. channel: "station.resume",
  1401. value: stationId
  1402. });
  1403. return cb({
  1404. status: "success",
  1405. message: "Successfully resumed."
  1406. });
  1407. }
  1408. );
  1409. },
  1410. /**
  1411. * Removes a station
  1412. *
  1413. * @param {object} session - user session
  1414. * @param {string} stationId - the station id
  1415. * @param {Function} cb - callback
  1416. */
  1417. async remove(session, stationId, cb) {
  1418. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1419. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  1420. async.waterfall(
  1421. [
  1422. next => {
  1423. hasPermission("stations.remove", session, stationId)
  1424. .then(() => next())
  1425. .catch(next);
  1426. },
  1427. next => {
  1428. stationModel.findById(stationId, (err, station) => {
  1429. if (err) return next(err);
  1430. return next(null, station);
  1431. });
  1432. },
  1433. (station, next) => {
  1434. stationModel.deleteOne({ _id: stationId }, err => next(err, station));
  1435. },
  1436. (station, next) => {
  1437. CacheModule.runJob("HDEL", { table: "stations", key: stationId }, this)
  1438. .then(() => next(null, station))
  1439. .catch(next);
  1440. },
  1441. // remove the playlist for the station
  1442. (station, next) => {
  1443. if (station.playlist)
  1444. PlaylistsModule.runJob("DELETE_PLAYLIST", { playlistId: station.playlist })
  1445. .then(() => {})
  1446. .catch(next);
  1447. next(null, station);
  1448. },
  1449. // remove reference to the station id in any array of a user's favorite stations
  1450. (station, next) => {
  1451. userModel.updateMany(
  1452. { favoriteStations: stationId },
  1453. { $pull: { favoriteStations: stationId } },
  1454. err => next(err, station)
  1455. );
  1456. }
  1457. ],
  1458. async (err, station) => {
  1459. if (err) {
  1460. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1461. this.log("ERROR", "STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  1462. return cb({ status: "error", message: err });
  1463. }
  1464. this.log("SUCCESS", "STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  1465. CacheModule.runJob("PUB", {
  1466. channel: "station.remove",
  1467. value: stationId
  1468. });
  1469. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1470. userId: session.userId,
  1471. type: "station__remove",
  1472. payload: { message: `Removed a station named ${station.displayName}` }
  1473. });
  1474. ActivitiesModule.runJob("REMOVE_ACTIVITY_REFERENCES", { type: "stationId", stationId });
  1475. return cb({
  1476. status: "success",
  1477. message: "Successfully removed."
  1478. });
  1479. }
  1480. );
  1481. },
  1482. /**
  1483. * Create a station
  1484. *
  1485. * @param session
  1486. * @param data - the station data
  1487. * @param cb
  1488. */
  1489. create: isLoginRequired(async function create(session, data, cb) {
  1490. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1491. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  1492. data.name = data.name.toLowerCase();
  1493. let blacklist = [
  1494. "about",
  1495. "support",
  1496. "staff",
  1497. "help",
  1498. "news",
  1499. "terms",
  1500. "privacy",
  1501. "profile",
  1502. "c",
  1503. "community",
  1504. "tos",
  1505. "login",
  1506. "register",
  1507. "p",
  1508. "official",
  1509. "o",
  1510. "faq",
  1511. "team",
  1512. "donate",
  1513. "buy",
  1514. "shop",
  1515. "forums",
  1516. "explore",
  1517. "settings",
  1518. "admin",
  1519. "auth",
  1520. "reset_password",
  1521. "backend",
  1522. "api",
  1523. "songs",
  1524. "playlists",
  1525. "playlist",
  1526. "albums",
  1527. "artists",
  1528. "artist",
  1529. "station"
  1530. ];
  1531. if (data.type === "community" && config.has("blacklistedCommunityStationNames"))
  1532. blacklist = [...blacklist, ...config.get("blacklistedCommunityStationNames")];
  1533. async.waterfall(
  1534. [
  1535. next => {
  1536. if (!data) return next("Invalid data.");
  1537. return next();
  1538. },
  1539. next => {
  1540. stationModel.findOne(
  1541. {
  1542. $or: [{ name: data.name }, { displayName: new RegExp(`^${data.displayName}$`, "i") }]
  1543. },
  1544. next
  1545. );
  1546. },
  1547. (station, next) => {
  1548. this.log(station);
  1549. if (station) return next("A station with that name or display name already exists.");
  1550. if (blacklist.indexOf(data.name) !== -1)
  1551. return next("That name is blacklisted. Please use a different name.");
  1552. if (data.type === "official")
  1553. return hasPermission("stations.create.official", session)
  1554. .then(() => next())
  1555. .catch(() => next("Insufficient permissions."));
  1556. return next();
  1557. },
  1558. next => {
  1559. const stationId = mongoose.Types.ObjectId();
  1560. playlistModel.create(
  1561. {
  1562. displayName: `Station - ${data.name}`,
  1563. songs: [],
  1564. createdBy: data.type === "official" ? "Musare" : session.userId,
  1565. createdFor: `${stationId}`,
  1566. createdAt: Date.now(),
  1567. type: "station"
  1568. },
  1569. (err, playlist) => {
  1570. next(err, playlist, stationId);
  1571. }
  1572. );
  1573. },
  1574. (playlist, stationId, next) => {
  1575. const { name, displayName, description, type } = data;
  1576. if (type === "official") {
  1577. stationModel.create(
  1578. {
  1579. _id: stationId,
  1580. name,
  1581. displayName,
  1582. description,
  1583. playlist: playlist._id,
  1584. type,
  1585. privacy: "private",
  1586. queue: [],
  1587. currentSong: null
  1588. },
  1589. next
  1590. );
  1591. } else {
  1592. stationModel.create(
  1593. {
  1594. _id: stationId,
  1595. name,
  1596. displayName,
  1597. description,
  1598. playlist: playlist._id,
  1599. type,
  1600. privacy: "private",
  1601. owner: session.userId,
  1602. queue: [],
  1603. currentSong: null
  1604. },
  1605. next
  1606. );
  1607. }
  1608. }
  1609. ],
  1610. async (err, station) => {
  1611. if (err) {
  1612. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1613. this.log("ERROR", "STATIONS_CREATE", `Creating station failed. "${err}"`);
  1614. cb({ status: "error", message: err });
  1615. } else {
  1616. this.log("SUCCESS", "STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  1617. CacheModule.runJob("PUB", {
  1618. channel: "station.create",
  1619. value: station._id
  1620. });
  1621. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1622. userId: session.userId,
  1623. type: "station__create",
  1624. payload: {
  1625. message: `Created a station named <stationId>${station.displayName}</stationId>`,
  1626. stationId: station._id
  1627. }
  1628. });
  1629. cb({
  1630. status: "success",
  1631. message: "Successfully created station."
  1632. });
  1633. }
  1634. }
  1635. );
  1636. }),
  1637. /**
  1638. * Adds song to station queue
  1639. *
  1640. * @param session
  1641. * @param stationId - the station id
  1642. * @param youtubeId - the song id
  1643. * @param cb
  1644. */
  1645. addToQueue: isLoginRequired(async function addToQueue(session, stationId, youtubeId, cb) {
  1646. async.waterfall(
  1647. [
  1648. next => {
  1649. StationsModule.runJob("GET_STATION", { stationId }, this)
  1650. .then(station => {
  1651. next(null, station);
  1652. })
  1653. .catch(next);
  1654. },
  1655. (station, next) => {
  1656. if (!station) return next("Station not found.");
  1657. if (!station.requests.enabled) return next("Requests are disabled in this station.");
  1658. if (
  1659. station.requests.access === "owner" ||
  1660. (station.requests.access === "user" && station.privacy === "private")
  1661. ) {
  1662. return hasPermission("stations.request", session, stationId)
  1663. .then(() => next(null, station))
  1664. .catch(() => next("You do not have permission to add songs to queue."));
  1665. }
  1666. return next(null, station);
  1667. },
  1668. (station, next) =>
  1669. StationsModule.runJob(
  1670. "CAN_USER_VIEW_STATION",
  1671. {
  1672. station,
  1673. userId: session.userId
  1674. },
  1675. this
  1676. )
  1677. .then(canView => {
  1678. if (canView) return next();
  1679. return next("Insufficient permissions.");
  1680. })
  1681. .catch(err => next(err)),
  1682. next =>
  1683. StationsModule.runJob(
  1684. "ADD_TO_QUEUE",
  1685. {
  1686. stationId,
  1687. youtubeId,
  1688. requestUser: session.userId
  1689. },
  1690. this
  1691. )
  1692. .then(() => next())
  1693. .catch(next)
  1694. ],
  1695. async err => {
  1696. if (err) {
  1697. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1698. this.log(
  1699. "ERROR",
  1700. "STATIONS_ADD_SONG_TO_QUEUE",
  1701. `Adding song "${youtubeId}" to station "${stationId}" queue failed. "${err}"`
  1702. );
  1703. return cb({ status: "error", message: err });
  1704. }
  1705. this.log(
  1706. "SUCCESS",
  1707. "STATIONS_ADD_SONG_TO_QUEUE",
  1708. `Added song "${youtubeId}" to station "${stationId}" successfully.`
  1709. );
  1710. return cb({
  1711. status: "success",
  1712. message: "Successfully added song to queue."
  1713. });
  1714. }
  1715. );
  1716. }),
  1717. /**
  1718. * Removes song from station queue
  1719. *
  1720. * @param {object} session - user session
  1721. * @param {string} stationId - the station id
  1722. * @param {string} youtubeId - the youtube id
  1723. * @param {Function} cb - callback
  1724. */
  1725. async removeFromQueue(session, stationId, youtubeId, cb) {
  1726. async.waterfall(
  1727. [
  1728. next => {
  1729. hasPermission("stations.queue.remove", session, stationId)
  1730. .then(() => next())
  1731. .catch(next);
  1732. },
  1733. next => {
  1734. if (!youtubeId) return next("Invalid youtube id.");
  1735. return StationsModule.runJob("REMOVE_FROM_QUEUE", { stationId, youtubeId }, this)
  1736. .then(() => next())
  1737. .catch(next);
  1738. }
  1739. ],
  1740. async err => {
  1741. if (err) {
  1742. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1743. this.log(
  1744. "ERROR",
  1745. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1746. `Removing song "${youtubeId}" from station "${stationId}" queue failed. "${err}"`
  1747. );
  1748. return cb({ status: "error", message: err });
  1749. }
  1750. this.log(
  1751. "SUCCESS",
  1752. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1753. `Removed song "${youtubeId}" from station "${stationId}" successfully.`
  1754. );
  1755. return cb({
  1756. status: "success",
  1757. message: "Successfully removed song from queue."
  1758. });
  1759. }
  1760. );
  1761. },
  1762. /**
  1763. * Gets the queue from a station
  1764. *
  1765. * @param {object} session - user session
  1766. * @param {string} stationId - the station id
  1767. * @param {Function} cb - callback
  1768. */
  1769. getQueue(session, stationId, cb) {
  1770. async.waterfall(
  1771. [
  1772. next => {
  1773. StationsModule.runJob("GET_STATION", { stationId }, this)
  1774. .then(station => next(null, station))
  1775. .catch(next);
  1776. },
  1777. (station, next) => {
  1778. if (!station) return next("Station not found.");
  1779. return next(null, station);
  1780. },
  1781. (station, next) => {
  1782. StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  1783. .then(canView => {
  1784. if (canView) return next(null, station);
  1785. return next("Insufficient permissions.");
  1786. })
  1787. .catch(err => next(err));
  1788. },
  1789. (station, next) => next(null, station.queue)
  1790. ],
  1791. async (err, queue) => {
  1792. if (err) {
  1793. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1794. this.log(
  1795. "ERROR",
  1796. "STATIONS_GET_QUEUE",
  1797. `Getting queue for station "${stationId}" failed. "${err}"`
  1798. );
  1799. return cb({ status: "error", message: err });
  1800. }
  1801. this.log("SUCCESS", "STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  1802. return cb({
  1803. status: "success",
  1804. message: "Successfully got queue.",
  1805. data: { queue }
  1806. });
  1807. }
  1808. );
  1809. },
  1810. /**
  1811. * Reposition a song in station queue
  1812. *
  1813. * @param {object} session - user session
  1814. * @param {string} stationId - the station id
  1815. * @param {object} song - contains details about the song that is to be repositioned
  1816. * @param {string} song.youtubeId - the youtube id of the song
  1817. * @param {number} song.newIndex - the new position for the song in the queue
  1818. * @param {number} song.oldIndex - the old position of the song in the queue
  1819. * @param {Function} cb - callback
  1820. */
  1821. async repositionSongInQueue(session, stationId, song, cb) {
  1822. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1823. async.waterfall(
  1824. [
  1825. next => {
  1826. hasPermission("stations.queue.reposition", session, stationId)
  1827. .then(() => next())
  1828. .catch(next);
  1829. },
  1830. next => {
  1831. if (!song || !song.youtubeId) return next("You must provide a song to reposition.");
  1832. return next();
  1833. },
  1834. // remove song from queue
  1835. next => {
  1836. stationModel.updateOne(
  1837. { _id: stationId },
  1838. { $pull: { queue: { youtubeId: song.youtubeId } } },
  1839. next
  1840. );
  1841. },
  1842. // add song back to queue (in new position)
  1843. (res, next) => {
  1844. stationModel.updateOne(
  1845. { _id: stationId },
  1846. { $push: { queue: { $each: [song], $position: song.newIndex } } },
  1847. err => next(err)
  1848. );
  1849. },
  1850. // update the cache representation of the station
  1851. next => {
  1852. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1853. .then(station => next(null, station))
  1854. .catch(next);
  1855. }
  1856. ],
  1857. async err => {
  1858. if (err) {
  1859. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1860. this.log(
  1861. "ERROR",
  1862. "STATIONS_REPOSITION_SONG_IN_QUEUE",
  1863. `Repositioning song ${song.youtubeId} in queue of station "${stationId}" failed. "${err}"`
  1864. );
  1865. return cb({ status: "error", message: err });
  1866. }
  1867. this.log(
  1868. "SUCCESS",
  1869. "STATIONS_REPOSITION_SONG_IN_QUEUE",
  1870. `Repositioned song ${song.youtubeId} in queue of station "${stationId}" successfully.`
  1871. );
  1872. CacheModule.runJob("PUB", {
  1873. channel: "station.repositionSongInQueue",
  1874. value: {
  1875. song: {
  1876. youtubeId: song.youtubeId,
  1877. oldIndex: song.oldIndex,
  1878. newIndex: song.newIndex
  1879. },
  1880. stationId
  1881. }
  1882. });
  1883. return cb({
  1884. status: "success",
  1885. message: "Successfully repositioned song in queue."
  1886. });
  1887. }
  1888. );
  1889. },
  1890. /**
  1891. * Autofill a playlist in a station
  1892. *
  1893. * @param {object} session - user session
  1894. * @param {string} stationId - the station id
  1895. * @param {string} playlistId - the playlist id
  1896. * @param {Function} cb - callback
  1897. */
  1898. async autofillPlaylist(session, stationId, playlistId, cb) {
  1899. async.waterfall(
  1900. [
  1901. next => {
  1902. hasPermission("stations.autofill", session, stationId)
  1903. .then(() => next())
  1904. .catch(next);
  1905. },
  1906. next => {
  1907. StationsModule.runJob("GET_STATION", { stationId }, this)
  1908. .then(station => next(null, station))
  1909. .catch(next);
  1910. },
  1911. (station, next) => {
  1912. if (!station) return next("Station not found.");
  1913. if (station.autofill.playlists.indexOf(playlistId) !== -1)
  1914. return next("That playlist is already autofilling.");
  1915. if (station.autofill.mode === "sequential" && station.autofill.playlists.length > 0)
  1916. return next("Error: Only 1 playlist can be autofilling in sequential mode.");
  1917. return next();
  1918. },
  1919. next => {
  1920. StationsModule.runJob("AUTOFILL_PLAYLIST", { stationId, playlistId }, this)
  1921. .then(() => {
  1922. next();
  1923. })
  1924. .catch(next);
  1925. }
  1926. ],
  1927. async err => {
  1928. if (err) {
  1929. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1930. this.log(
  1931. "ERROR",
  1932. "STATIONS_AUTOFILL_PLAYLIST",
  1933. `Including playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  1934. );
  1935. return cb({ status: "error", message: err });
  1936. }
  1937. this.log(
  1938. "SUCCESS",
  1939. "STATIONS_AUTOFILL_PLAYLIST",
  1940. `Including playlist "${playlistId}" for station "${stationId}" successfully.`
  1941. );
  1942. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  1943. CacheModule.runJob("PUB", {
  1944. channel: "station.autofillPlaylist",
  1945. value: {
  1946. playlistId,
  1947. stationId
  1948. }
  1949. });
  1950. return cb({
  1951. status: "success",
  1952. message: "Successfully added autofill playlist."
  1953. });
  1954. }
  1955. );
  1956. },
  1957. /**
  1958. * Remove autofilled playlist from a station
  1959. *
  1960. * @param {object} session - user session
  1961. * @param {string} stationId - the station id
  1962. * @param {string} playlistId - the playlist id
  1963. * @param {Function} cb - callback
  1964. */
  1965. async removeAutofillPlaylist(session, stationId, playlistId, cb) {
  1966. async.waterfall(
  1967. [
  1968. next => {
  1969. hasPermission("stations.autofill", session, stationId)
  1970. .then(() => next())
  1971. .catch(next);
  1972. },
  1973. next => {
  1974. StationsModule.runJob("GET_STATION", { stationId }, this)
  1975. .then(station => next(null, station))
  1976. .catch(next);
  1977. },
  1978. (station, next) => {
  1979. if (!station) return next("Station not found.");
  1980. if (station.autofill.playlists.indexOf(playlistId) === -1)
  1981. return next("That playlist is not autofilling.");
  1982. return next();
  1983. },
  1984. next => {
  1985. StationsModule.runJob("REMOVE_AUTOFILL_PLAYLIST", { stationId, playlistId }, this)
  1986. .then(() => {
  1987. next();
  1988. })
  1989. .catch(next);
  1990. }
  1991. ],
  1992. async err => {
  1993. if (err) {
  1994. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1995. this.log(
  1996. "ERROR",
  1997. "STATIONS_REMOVE_AUTOFILL_PLAYLIST",
  1998. `Removing autofill playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  1999. );
  2000. return cb({ status: "error", message: err });
  2001. }
  2002. this.log(
  2003. "SUCCESS",
  2004. "STATIONS_REMOVE_AUTOFILL_PLAYLIST",
  2005. `Removing autofill playlist "${playlistId}" for station "${stationId}" successfully.`
  2006. );
  2007. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2008. CacheModule.runJob("PUB", {
  2009. channel: "station.removedAutofillPlaylist",
  2010. value: {
  2011. playlistId,
  2012. stationId
  2013. }
  2014. });
  2015. return cb({
  2016. status: "success",
  2017. message: "Successfully removed autofill playlist."
  2018. });
  2019. }
  2020. );
  2021. },
  2022. /**
  2023. * Blacklist a playlist in a station
  2024. *
  2025. * @param {object} session - user session
  2026. * @param {string} stationId - the station id
  2027. * @param {string} playlistId - the playlist id
  2028. * @param {Function} cb - callback
  2029. */
  2030. async blacklistPlaylist(session, stationId, playlistId, cb) {
  2031. async.waterfall(
  2032. [
  2033. next => {
  2034. hasPermission("stations.blacklist", session, stationId)
  2035. .then(() => next())
  2036. .catch(next);
  2037. },
  2038. next => {
  2039. StationsModule.runJob("GET_STATION", { stationId }, this)
  2040. .then(station => next(null, station))
  2041. .catch(next);
  2042. },
  2043. (station, next) => {
  2044. if (!station) return next("Station not found.");
  2045. if (station.blacklist.indexOf(playlistId) !== -1)
  2046. return next("That playlist is already blacklisted.");
  2047. return next();
  2048. },
  2049. next => {
  2050. StationsModule.runJob("BLACKLIST_PLAYLIST", { stationId, playlistId }, this)
  2051. .then(() => {
  2052. next();
  2053. })
  2054. .catch(next);
  2055. }
  2056. ],
  2057. async err => {
  2058. if (err) {
  2059. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2060. this.log(
  2061. "ERROR",
  2062. "STATIONS_BLACKLIST_PLAYLIST",
  2063. `Blacklisting playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2064. );
  2065. return cb({ status: "error", message: err });
  2066. }
  2067. this.log(
  2068. "SUCCESS",
  2069. "STATIONS_BLACKLIST_PLAYLIST",
  2070. `Blacklisting playlist "${playlistId}" for station "${stationId}" successfully.`
  2071. );
  2072. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2073. CacheModule.runJob("PUB", {
  2074. channel: "station.blacklistedPlaylist",
  2075. value: {
  2076. playlistId,
  2077. stationId
  2078. }
  2079. });
  2080. return cb({
  2081. status: "success",
  2082. message: "Successfully blacklisted playlist."
  2083. });
  2084. }
  2085. );
  2086. },
  2087. /**
  2088. * Remove blacklisted a playlist from a station
  2089. *
  2090. * @param {object} session - user session
  2091. * @param {string} stationId - the station id
  2092. * @param {string} playlistId - the playlist id
  2093. * @param {Function} cb - callback
  2094. */
  2095. async removeBlacklistedPlaylist(session, stationId, playlistId, cb) {
  2096. async.waterfall(
  2097. [
  2098. next => {
  2099. hasPermission("stations.blacklist", session, stationId)
  2100. .then(() => next())
  2101. .catch(next);
  2102. },
  2103. next => {
  2104. StationsModule.runJob("GET_STATION", { stationId }, this)
  2105. .then(station => next(null, station))
  2106. .catch(next);
  2107. },
  2108. (station, next) => {
  2109. if (!station) return next("Station not found.");
  2110. if (station.blacklist.indexOf(playlistId) === -1) return next("That playlist is not blacklisted.");
  2111. return next();
  2112. },
  2113. next => {
  2114. StationsModule.runJob("REMOVE_BLACKLISTED_PLAYLIST", { stationId, playlistId }, this)
  2115. .then(() => {
  2116. next();
  2117. })
  2118. .catch(next);
  2119. }
  2120. ],
  2121. async err => {
  2122. if (err) {
  2123. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2124. this.log(
  2125. "ERROR",
  2126. "STATIONS_REMOVE_BLACKLISTED_PLAYLIST",
  2127. `Removing blacklisted playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2128. );
  2129. return cb({ status: "error", message: err });
  2130. }
  2131. this.log(
  2132. "SUCCESS",
  2133. "STATIONS_REMOVE_BLACKLISTED_PLAYLIST",
  2134. `Removing blacklisted playlist "${playlistId}" for station "${stationId}" successfully.`
  2135. );
  2136. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2137. CacheModule.runJob("PUB", {
  2138. channel: "station.removedBlacklistedPlaylist",
  2139. value: {
  2140. playlistId,
  2141. stationId
  2142. }
  2143. });
  2144. return cb({
  2145. status: "success",
  2146. message: "Successfully removed blacklisted playlist."
  2147. });
  2148. }
  2149. );
  2150. },
  2151. favoriteStation: isLoginRequired(async function favoriteStation(session, stationId, cb) {
  2152. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2153. async.waterfall(
  2154. [
  2155. next => {
  2156. StationsModule.runJob("GET_STATION", { stationId }, this)
  2157. .then(station => next(null, station))
  2158. .catch(next);
  2159. },
  2160. (station, next) => {
  2161. if (!station) return next("Station not found.");
  2162. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  2163. .then(canView => {
  2164. if (canView) return next(null, station);
  2165. return next("Insufficient permissions.");
  2166. })
  2167. .catch(err => next(err));
  2168. },
  2169. (station, next) => {
  2170. userModel.updateOne(
  2171. { _id: session.userId },
  2172. { $addToSet: { favoriteStations: stationId } },
  2173. (err, res) => next(err, station, res)
  2174. );
  2175. },
  2176. (station, res, next) => {
  2177. if (res.nModified === 0) return next("The station was already favorited.");
  2178. return next(null, station);
  2179. }
  2180. ],
  2181. async (err, station) => {
  2182. if (err) {
  2183. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2184. this.log("ERROR", "FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  2185. return cb({ status: "error", message: err });
  2186. }
  2187. this.log("SUCCESS", "FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  2188. CacheModule.runJob("PUB", {
  2189. channel: "user.favoritedStation",
  2190. value: {
  2191. userId: session.userId,
  2192. stationId
  2193. }
  2194. });
  2195. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2196. userId: session.userId,
  2197. type: "station__favorite",
  2198. payload: {
  2199. message: `Favorited station <stationId>${station.displayName}</stationId>`,
  2200. stationId
  2201. }
  2202. });
  2203. return cb({
  2204. status: "success",
  2205. message: "Succesfully favorited station."
  2206. });
  2207. }
  2208. );
  2209. }),
  2210. unfavoriteStation: isLoginRequired(async function unfavoriteStation(session, stationId, cb) {
  2211. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2212. async.waterfall(
  2213. [
  2214. next => {
  2215. userModel.updateOne({ _id: session.userId }, { $pull: { favoriteStations: stationId } }, next);
  2216. },
  2217. (res, next) => {
  2218. if (res.nModified === 0) return next("The station wasn't favorited.");
  2219. return next();
  2220. },
  2221. next => {
  2222. StationsModule.runJob("GET_STATION", { stationId }, this)
  2223. .then(station => next(null, station))
  2224. .catch(next);
  2225. }
  2226. ],
  2227. async (err, station) => {
  2228. if (err) {
  2229. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2230. this.log("ERROR", "UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  2231. return cb({ status: "error", message: err });
  2232. }
  2233. this.log("SUCCESS", "UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  2234. CacheModule.runJob("PUB", {
  2235. channel: "user.unfavoritedStation",
  2236. value: {
  2237. userId: session.userId,
  2238. stationId
  2239. }
  2240. });
  2241. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2242. userId: session.userId,
  2243. type: "station__unfavorite",
  2244. payload: {
  2245. message: `Unfavorited station <stationId>${station.displayName}</stationId>`,
  2246. stationId
  2247. }
  2248. });
  2249. return cb({
  2250. status: "success",
  2251. message: "Succesfully unfavorited station."
  2252. });
  2253. }
  2254. );
  2255. }),
  2256. /**
  2257. * Clears every station queue
  2258. *
  2259. * @param {object} session - the session object automatically added by socket.io
  2260. * @param {Function} cb - gets called with the result
  2261. */
  2262. clearEveryStationQueue: useHasPermission(
  2263. "stations.clearEveryStationQueue",
  2264. async function clearEveryStationQueue(session, cb) {
  2265. this.keepLongJob();
  2266. this.publishProgress({
  2267. status: "started",
  2268. title: "Clear every station queue",
  2269. message: "Clearing every station queue.",
  2270. id: this.toString()
  2271. });
  2272. await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
  2273. await CacheModule.runJob(
  2274. "PUB",
  2275. {
  2276. channel: "longJob.added",
  2277. value: { jobId: this.toString(), userId: session.userId }
  2278. },
  2279. this
  2280. );
  2281. async.waterfall(
  2282. [
  2283. next => {
  2284. StationsModule.runJob("CLEAR_EVERY_STATION_QUEUE", {}, this)
  2285. .then(() => next())
  2286. .catch(next);
  2287. }
  2288. ],
  2289. async err => {
  2290. if (err) {
  2291. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2292. this.log("ERROR", "CLEAR_EVERY_STATION_QUEUE", `Clearing every station queue failed. "${err}"`);
  2293. this.publishProgress({
  2294. status: "error",
  2295. message: err
  2296. });
  2297. return cb({ status: "error", message: err });
  2298. }
  2299. this.log("SUCCESS", "CLEAR_EVERY_STATION_QUEUE", "Clearing every station queue was successful.");
  2300. this.publishProgress({
  2301. status: "success",
  2302. message: "Successfully cleared every station queue."
  2303. });
  2304. return cb({ status: "success", message: "Successfully cleared every station queue." });
  2305. }
  2306. );
  2307. }
  2308. ),
  2309. /**
  2310. * Reset a station queue
  2311. *
  2312. * @param {object} session - the session object automatically added by socket.io
  2313. * @param {string} stationId - the station id
  2314. * @param {Function} cb - gets called with the result
  2315. */
  2316. async resetQueue(session, stationId, cb) {
  2317. async.waterfall(
  2318. [
  2319. next => {
  2320. hasPermission("stations.queue.reset", session, stationId)
  2321. .then(() => next())
  2322. .catch(next);
  2323. },
  2324. next => {
  2325. StationsModule.runJob("RESET_QUEUE", { stationId }, this)
  2326. .then(() => next())
  2327. .catch(next);
  2328. }
  2329. ],
  2330. async err => {
  2331. if (err) {
  2332. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2333. this.log("ERROR", "RESET_QUEUE", `Resetting station queue failed. "${err}"`);
  2334. return cb({ status: "error", message: err });
  2335. }
  2336. this.log("SUCCESS", "RESET_QUEUE", "Resetting station queue was successful.");
  2337. return cb({ status: "success", message: "Successfully reset station queue." });
  2338. }
  2339. );
  2340. },
  2341. /**
  2342. * Gets skip votes for a station
  2343. *
  2344. * @param session
  2345. * @param stationId - the station id
  2346. * @param stationId - the song id to get skipvotes for
  2347. * @param cb
  2348. */
  2349. getSkipVotes: isLoginRequired(async function getSkipVotes(session, stationId, songId, cb) {
  2350. async.waterfall(
  2351. [
  2352. next => {
  2353. StationsModule.runJob("GET_STATION", { stationId }, this)
  2354. .then(res => next(null, res.currentSong))
  2355. .catch(console.log);
  2356. },
  2357. (currentSong, next) => {
  2358. if (currentSong && currentSong._id === songId)
  2359. next(null, {
  2360. skipVotes: currentSong.skipVotes.length,
  2361. skipVotesCurrent: true,
  2362. voted: currentSong.skipVotes.indexOf(session.userId) !== -1
  2363. });
  2364. else
  2365. next(null, {
  2366. skipVotes: 0,
  2367. skipVotesCurrent: false,
  2368. voted: false
  2369. });
  2370. }
  2371. ],
  2372. async (err, data) => {
  2373. if (err) {
  2374. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2375. this.log(
  2376. "ERROR",
  2377. "STATIONS_GET_SKIP_VOTES",
  2378. `User "${session.userId}" failed to get skip votes for ${stationId}. "${err}"`
  2379. );
  2380. return cb({ status: "error", message: err });
  2381. }
  2382. return cb({
  2383. status: "success",
  2384. data
  2385. });
  2386. }
  2387. );
  2388. }),
  2389. /**
  2390. * Add DJ to station
  2391. *
  2392. * @param {object} session - the session object automatically added by socket.io
  2393. * @param {string} stationId - the station id
  2394. * @param {string} userId - the dj user id
  2395. * @param {Function} cb - gets called with the result
  2396. */
  2397. async addDj(session, stationId, userId, cb) {
  2398. async.waterfall(
  2399. [
  2400. next => {
  2401. hasPermission("stations.djs.add", session, stationId)
  2402. .then(() => next())
  2403. .catch(next);
  2404. },
  2405. next => {
  2406. StationsModule.runJob("ADD_DJ", { stationId, userId }, this)
  2407. .then(() => next())
  2408. .catch(next);
  2409. }
  2410. ],
  2411. async err => {
  2412. if (err) {
  2413. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2414. this.log("ERROR", "ADD_DJ", `Adding DJ failed. "${err}"`);
  2415. return cb({ status: "error", message: err });
  2416. }
  2417. this.log("SUCCESS", "ADD_DJ", "Adding DJ was successful.");
  2418. return cb({ status: "success", message: "Successfully added DJ." });
  2419. }
  2420. );
  2421. },
  2422. /**
  2423. * Remove DJ from station
  2424. *
  2425. * @param {object} session - the session object automatically added by socket.io
  2426. * @param {string} stationId - the station id
  2427. * @param {string} userId - the dj user id
  2428. * @param {Function} cb - gets called with the result
  2429. */
  2430. async removeDj(session, stationId, userId, cb) {
  2431. async.waterfall(
  2432. [
  2433. next => {
  2434. hasPermission("stations.djs.remove", session, stationId)
  2435. .then(() => next())
  2436. .catch(next);
  2437. },
  2438. next => {
  2439. StationsModule.runJob("REMOVE_DJ", { stationId, userId }, this)
  2440. .then(() => next())
  2441. .catch(next);
  2442. }
  2443. ],
  2444. async err => {
  2445. if (err) {
  2446. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2447. this.log("ERROR", "REMOVE_DJ", `Removing DJ failed. "${err}"`);
  2448. return cb({ status: "error", message: err });
  2449. }
  2450. this.log("SUCCESS", "REMOVE_DJ", "Removing DJ was successful.");
  2451. return cb({ status: "success", message: "Successfully removed DJ." });
  2452. }
  2453. );
  2454. }
  2455. };