stations.js 70 KB


  1. import async from "async";
  2. import mongoose from "mongoose";
  3. import config from "config";
  4. import { hasPermission, useHasPermission } from "../hooks/hasPermission";
  5. import isLoginRequired from "../hooks/loginRequired";
  6. // eslint-disable-next-line
  7. import moduleManager from "../../index";
  8. const DBModule = moduleManager.modules.db;
  9. const UtilsModule = moduleManager.modules.utils;
  10. const WSModule = moduleManager.modules.ws;
  11. const PlaylistsModule = moduleManager.modules.playlists;
  12. const CacheModule = moduleManager.modules.cache;
  13. const NotificationsModule = moduleManager.modules.notifications;
  14. const StationsModule = moduleManager.modules.stations;
  15. const ActivitiesModule = moduleManager.modules.activities;
  16. CacheModule.runJob("SUB", {
  17. channel: "station.updateUsers",
  18. cb: ({ stationId, usersPerStation }) => {
  19. WSModule.runJob("EMIT_TO_ROOM", {
  20. room: `station.${stationId}`,
  21. args: ["event:station.users.updated", { data: { users: usersPerStation } }]
  22. });
  23. }
  24. });
  25. CacheModule.runJob("SUB", {
  26. channel: "station.updateUserCount",
  27. cb: ({ stationId, usersPerStationCount }) => {
  28. const count = usersPerStationCount || 0;
  29. WSModule.runJob("EMIT_TO_ROOM", {
  30. room: `station.${stationId}`,
  31. args: ["event:station.userCount.updated", { data: { userCount: count } }]
  32. });
  33. StationsModule.runJob("GET_STATION", { stationId }).then(async station => {
  34. if (station.privacy === "public")
  35. WSModule.runJob("EMIT_TO_ROOM", {
  36. room: "home",
  37. args: ["event:station.userCount.updated", { data: { stationId, userCount: count } }]
  38. });
  39. else {
  40. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  41. room: "home"
  42. });
  43. sockets.forEach(async socketId => {
  44. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  45. if (!socket) return;
  46. const { session } = socket;
  47. hasPermission("stations.view", session, stationId)
  48. .then(() => {
  49. socket.dispatch("event:station.userCount.updated", {
  50. data: { stationId, count }
  51. });
  52. })
  53. .catch(() => {});
  54. });
  55. }
  56. });
  57. }
  58. });
  59. CacheModule.runJob("SUB", {
  60. channel: "station.autofillPlaylist",
  61. cb: data => {
  62. const { stationId, playlistId } = data;
  63. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }).then(playlist =>
  64. WSModule.runJob("EMIT_TO_ROOMS", {
  65. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  66. args: ["event:station.autofillPlaylist", { data: { stationId, playlist } }]
  67. })
  68. );
  69. }
  70. });
  71. CacheModule.runJob("SUB", {
  72. channel: "station.blacklistedPlaylist",
  73. cb: data => {
  74. const { stationId, playlistId } = data;
  75. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }).then(playlist =>
  76. WSModule.runJob("EMIT_TO_ROOMS", {
  77. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  78. args: ["event:station.blacklistedPlaylist", { data: { stationId, playlist } }]
  79. })
  80. );
  81. }
  82. });
  83. CacheModule.runJob("SUB", {
  84. channel: "station.removedAutofillPlaylist",
  85. cb: data => {
  86. const { stationId, playlistId } = data;
  87. WSModule.runJob("EMIT_TO_ROOMS", {
  88. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  89. args: ["event:station.removedAutofillPlaylist", { data: { stationId, playlistId } }]
  90. });
  91. }
  92. });
  93. CacheModule.runJob("SUB", {
  94. channel: "station.removedBlacklistedPlaylist",
  95. cb: data => {
  96. const { stationId, playlistId } = data;
  97. WSModule.runJob("EMIT_TO_ROOMS", {
  98. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  99. args: ["event:station.removedBlacklistedPlaylist", { data: { stationId, playlistId } }]
  100. });
  101. }
  102. });
  103. CacheModule.runJob("SUB", {
  104. channel: "station.pause",
  105. cb: stationId => {
  106. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  107. WSModule.runJob("EMIT_TO_ROOM", {
  108. room: `station.${stationId}`,
  109. args: ["event:station.pause", { data: { pausedAt: station.pausedAt } }]
  110. });
  111. WSModule.runJob("EMIT_TO_ROOM", {
  112. room: `manage-station.${stationId}`,
  113. args: ["event:station.pause", { data: { stationId, pausedAt: station.pausedAt } }]
  114. });
  115. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  116. room: `home`,
  117. station
  118. }).then(response => {
  119. const { socketsThatCan } = response;
  120. socketsThatCan.forEach(socket => {
  121. socket.dispatch("event:station.pause", { data: { stationId } });
  122. });
  123. });
  124. });
  125. }
  126. });
  127. CacheModule.runJob("SUB", {
  128. channel: "station.resume",
  129. cb: stationId => {
  130. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  131. WSModule.runJob("EMIT_TO_ROOM", {
  132. room: `station.${stationId}`,
  133. args: ["event:station.resume", { data: { timePaused: station.timePaused } }]
  134. });
  135. WSModule.runJob("EMIT_TO_ROOM", {
  136. room: `manage-station.${stationId}`,
  137. args: ["event:station.resume", { data: { stationId, timePaused: station.timePaused } }]
  138. });
  139. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  140. room: `home`,
  141. station
  142. })
  143. .then(response => {
  144. const { socketsThatCan } = response;
  145. socketsThatCan.forEach(socket => {
  146. socket.dispatch("event:station.resume", { data: { stationId } });
  147. });
  148. })
  149. .catch(console.log);
  150. });
  151. }
  152. });
  153. CacheModule.runJob("SUB", {
  154. channel: "station.queueUpdate",
  155. cb: stationId => {
  156. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  157. if (!station.currentSong && station.queue.length > 0) {
  158. StationsModule.runJob("INITIALIZE_STATION", {
  159. stationId
  160. }).then();
  161. }
  162. WSModule.runJob("EMIT_TO_ROOM", {
  163. room: `station.${stationId}`,
  164. args: ["event:station.queue.updated", { data: { queue: station.queue } }]
  165. });
  166. WSModule.runJob("EMIT_TO_ROOM", {
  167. room: `manage-station.${stationId}`,
  168. args: ["event:manageStation.queue.updated", { data: { stationId, queue: station.queue } }]
  169. });
  170. });
  171. }
  172. });
  173. CacheModule.runJob("SUB", {
  174. channel: "station.repositionSongInQueue",
  175. cb: res => {
  176. WSModule.runJob("EMIT_TO_ROOM", {
  177. room: `station.${res.stationId}`,
  178. args: ["event:station.queue.song.repositioned", { data: { song: res.song } }]
  179. });
  180. WSModule.runJob("EMIT_TO_ROOM", {
  181. room: `manage-station.${res.stationId}`,
  182. args: [
  183. "event:manageStation.queue.song.repositioned",
  184. { data: { stationId: res.stationId, song: res.song } }
  185. ]
  186. });
  187. }
  188. });
  189. CacheModule.runJob("SUB", {
  190. channel: "station.toggleSkipVote",
  191. cb: res => {
  192. const { stationId, voted, userId } = res;
  193. WSModule.runJob("EMIT_TO_ROOM", {
  194. room: `station.${stationId}`,
  195. args: ["event:station.toggleSkipVote", { data: { voted, userId } }]
  196. });
  197. }
  198. });
  199. CacheModule.runJob("SUB", {
  200. channel: "station.remove",
  201. cb: stationId => {
  202. WSModule.runJob("EMIT_TO_ROOM", {
  203. room: `station.${stationId}`,
  204. args: ["event:station.deleted"]
  205. });
  206. WSModule.runJob("EMIT_TO_ROOM", {
  207. room: `manage-station.${stationId}`,
  208. args: ["event:station.deleted"]
  209. });
  210. WSModule.runJob("EMIT_TO_ROOM", {
  211. room: `home`,
  212. args: ["event:station.deleted", { data: { stationId } }]
  213. });
  214. WSModule.runJob("EMIT_TO_ROOM", {
  215. room: "admin.stations",
  216. args: ["event:admin.station.deleted", { data: { stationId } }]
  217. });
  218. }
  219. });
  220. CacheModule.runJob("SUB", {
  221. channel: "station.create",
  222. cb: async stationId => {
  223. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then(async res => {
  224. const { station } = res;
  225. station.userCount = StationsModule.usersPerStationCount[stationId] || 0;
  226. WSModule.runJob("EMIT_TO_ROOM", {
  227. room: "admin.stations",
  228. args: ["event:admin.station.created", { data: { station } }]
  229. });
  230. if (station.privacy === "public")
  231. WSModule.runJob("EMIT_TO_ROOM", {
  232. room: "home",
  233. args: ["event:station.created", { data: { station } }]
  234. });
  235. else {
  236. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  237. room: "home"
  238. });
  239. sockets.forEach(async socketId => {
  240. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  241. if (!socket) return;
  242. const { session } = socket;
  243. hasPermission("stations.view", session, stationId)
  244. .then(() => {
  245. socket.dispatch("event:station.created", { data: { station } });
  246. })
  247. .catch(() => {});
  248. });
  249. }
  250. });
  251. }
  252. });
  253. CacheModule.runJob("SUB", {
  254. channel: "station.updated",
  255. cb: async data => {
  256. const stationModel = await DBModule.runJob("GET_MODEL", {
  257. modelName: "station"
  258. });
  259. const { stationId } = data;
  260. stationModel.findOne(
  261. { _id: stationId },
  262. ["_id", "name", "displayName", "description", "type", "privacy", "owner", "requests", "autofill", "theme"],
  263. (err, station) => {
  264. WSModule.runJob("EMIT_TO_ROOMS", {
  265. rooms: [`station.${stationId}`, `manage-station.${stationId}`, "admin.stations"],
  266. args: ["event:station.updated", { data: { station } }]
  267. });
  268. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  269. room: `home`,
  270. station
  271. }).then(response => {
  272. const { socketsThatCan } = response;
  273. socketsThatCan.forEach(socket => {
  274. socket.dispatch("event:station.updated", { data: { station } });
  275. });
  276. });
  277. if (data.previousStation && data.previousStation.privacy !== station.privacy) {
  278. if (station.privacy === "public") {
  279. // Station became public
  280. WSModule.runJob("EMIT_TO_ROOM", {
  281. room: "home",
  282. args: ["event:station.created", { data: { station } }]
  283. });
  284. } else if (data.previousStation.privacy === "public") {
  285. // Station became hidden
  286. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  287. room: `home`,
  288. station
  289. }).then(response => {
  290. const { socketsThatCannot } = response;
  291. socketsThatCannot.forEach(socket => {
  292. socket.dispatch("event:station.deleted", { data: { stationId } });
  293. });
  294. });
  295. }
  296. }
  297. }
  298. );
  299. }
  300. });
  301. export default {
  302. /**
  303. * Get a list of all the stations
  304. *
  305. * @param {object} session - user session
  306. * @param {boolean} adminFilter - whether to filter out stations admins do not own
  307. * @param {Function} cb - callback
  308. */
  309. async index(session, adminFilter, cb) {
  310. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  311. async.waterfall(
  312. [
  313. // get array of the ids of the user's favorite stations
  314. next => {
  315. if (session.userId)
  316. return userModel.findById(session.userId).select({ favoriteStations: -1 }).exec(next);
  317. return next(null, { favoriteStations: [] });
  318. },
  319. (user, next) => {
  320. const favoriteStations = user ? user.favoriteStations : [];
  321. CacheModule.runJob("HGETALL", { table: "stations" }, this).then(stations =>
  322. next(null, stations, favoriteStations)
  323. );
  324. },
  325. (stations, favorited, next) => {
  326. const filteredStations = [];
  327. async.eachLimit(
  328. stations,
  329. 1,
  330. (station, nextStation) => {
  331. async.waterfall(
  332. [
  333. callback => {
  334. // only relevant if user logged in
  335. if (session.userId) {
  336. if (favorited.indexOf(station._id) !== -1) station.isFavorited = true;
  337. return callback();
  338. }
  339. return callback();
  340. },
  341. callback => {
  342. StationsModule.runJob(
  343. "CAN_USER_VIEW_STATION",
  344. {
  345. station,
  346. userId: session.userId
  347. },
  348. this
  349. )
  350. .then(exists => callback(null, exists))
  351. .catch(callback);
  352. },
  353. (exists, callback) => {
  354. if (!exists) callback(null, false, false);
  355. else if (station.privacy === "public") callback(null, true, true);
  356. else
  357. hasPermission("stations.index", session.userId, station._id)
  358. .then(() => callback(null, true, true))
  359. .catch(() => callback(null, true, false));
  360. },
  361. (exists, canIndex, callback) => {
  362. if (!exists) callback(null, false);
  363. else if (!canIndex && !adminFilter)
  364. hasPermission("stations.index.other", session.userId)
  365. .then(() => callback(null, true))
  366. .catch(() => callback(null, false));
  367. else callback(null, canIndex);
  368. }
  369. ],
  370. (err, exists) => {
  371. if (err) return this.log("ERROR", "STATIONS_INDEX", err);
  372. station.userCount = StationsModule.usersPerStationCount[station._id] || 0;
  373. if (exists) filteredStations.push(station);
  374. return nextStation();
  375. }
  376. );
  377. },
  378. () => next(null, filteredStations, favorited)
  379. );
  380. }
  381. ],
  382. async (err, stations, favorited) => {
  383. if (err) {
  384. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  385. this.log("ERROR", "STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  386. return cb({ status: "error", message: err });
  387. }
  388. this.log("SUCCESS", "STATIONS_INDEX", `Indexing stations successful.`, false);
  389. return cb({ status: "success", data: { stations, favorited } });
  390. }
  391. );
  392. },
  393. /**
  394. * Gets stations, used in the admin stations page by the AdvancedTable component
  395. *
  396. * @param {object} session - the session object automatically added by the websocket
  397. * @param page - the page
  398. * @param pageSize - the size per page
  399. * @param properties - the properties to return for each station
  400. * @param sort - the sort object
  401. * @param queries - the queries array
  402. * @param operator - the operator for queries
  403. * @param cb
  404. */
  405. getData: useHasPermission(
  406. "admin.view.stations",
  407. async function getSet(session, page, pageSize, properties, sort, queries, operator, cb) {
  408. async.waterfall(
  409. [
  410. next => {
  411. DBModule.runJob(
  412. "GET_DATA",
  413. {
  414. page,
  415. pageSize,
  416. properties,
  417. sort,
  418. queries,
  419. operator,
  420. modelName: "station",
  421. blacklistedProperties: [],
  422. specialProperties: {
  423. owner: [
  424. {
  425. $addFields: {
  426. ownerOID: {
  427. $convert: {
  428. input: "$owner",
  429. to: "objectId",
  430. onError: "unknown",
  431. onNull: "unknown"
  432. }
  433. }
  434. }
  435. },
  436. {
  437. $lookup: {
  438. from: "users",
  439. localField: "ownerOID",
  440. foreignField: "_id",
  441. as: "ownerUser"
  442. }
  443. },
  444. {
  445. $unwind: {
  446. path: "$ownerUser",
  447. preserveNullAndEmptyArrays: true
  448. }
  449. },
  450. {
  451. $addFields: {
  452. ownerUsername: {
  453. $cond: [
  454. { $eq: [{ $type: "$owner" }, "string"] },
  455. { $ifNull: ["$ownerUser.username", "unknown"] },
  456. "none"
  457. ]
  458. }
  459. }
  460. },
  461. {
  462. $project: {
  463. ownerOID: 0,
  464. ownerUser: 0
  465. }
  466. }
  467. ]
  468. },
  469. specialQueries: {
  470. owner: newQuery => ({ $or: [newQuery, { ownerUsername: newQuery.owner }] })
  471. }
  472. },
  473. this
  474. )
  475. .then(response => {
  476. next(null, response);
  477. })
  478. .catch(err => {
  479. next(err);
  480. });
  481. }
  482. ],
  483. async (err, response) => {
  484. if (err) {
  485. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  486. this.log("ERROR", "STATIONS_GET_DATA", `Failed to get data from stations. "${err}"`);
  487. return cb({ status: "error", message: err });
  488. }
  489. this.log("SUCCESS", "STATIONS_GET_DATA", `Got data from stations successfully.`);
  490. return cb({ status: "success", message: "Successfully got data from stations.", data: response });
  491. }
  492. );
  493. }
  494. ),
  495. /**
  496. * Obtains basic metadata of a station in order to format an activity
  497. *
  498. * @param {object} session - user session
  499. * @param {string} stationId - the station id
  500. * @param {Function} cb - callback
  501. */
  502. getStationForActivity(session, stationId, cb) {
  503. async.waterfall(
  504. [
  505. next => {
  506. StationsModule.runJob("GET_STATION", { stationId }, this)
  507. .then(station => {
  508. next(null, station);
  509. })
  510. .catch(next);
  511. }
  512. ],
  513. async (err, station) => {
  514. if (err) {
  515. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  516. this.log(
  517. "ERROR",
  518. "STATIONS_GET_STATION_FOR_ACTIVITY",
  519. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  520. );
  521. return cb({ status: "error", message: err });
  522. }
  523. this.log(
  524. "SUCCESS",
  525. "STATIONS_GET_STATION_FOR_ACTIVITY",
  526. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  527. );
  528. return cb({
  529. status: "success",
  530. data: {
  531. title: station.displayName,
  532. thumbnail: station.currentSong ? station.currentSong.thumbnail : ""
  533. }
  534. });
  535. }
  536. );
  537. },
  538. /**
  539. * Verifies that a station exists from its name
  540. *
  541. * @param {object} session - user session
  542. * @param {string} stationName - the station name
  543. * @param {Function} cb - callback
  544. */
  545. existsByName(session, stationName, cb) {
  546. async.waterfall(
  547. [
  548. next => {
  549. StationsModule.runJob("GET_STATION_BY_NAME", { stationName }, this)
  550. .then(station => next(null, station))
  551. .catch(next);
  552. },
  553. (station, next) => {
  554. if (!station) return next(null, false);
  555. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  556. .then(exists => next(null, exists))
  557. .catch(next);
  558. }
  559. ],
  560. async (err, exists) => {
  561. if (err) {
  562. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  563. this.log(
  564. "ERROR",
  565. "STATION_EXISTS_BY_NAME",
  566. `Checking if station "${stationName}" exists failed. "${err}"`
  567. );
  568. return cb({ status: "error", message: err });
  569. }
  570. this.log(
  571. "SUCCESS",
  572. "STATION_EXISTS_BY_NAME",
  573. `Station "${stationName}" exists successfully.` /* , false */
  574. );
  575. return cb({ status: "success", data: { exists } });
  576. }
  577. );
  578. },
  579. /**
  580. * Verifies that a station exists from its id
  581. *
  582. * @param {object} session - user session
  583. * @param {string} stationId - the station id
  584. * @param {Function} cb - callback
  585. */
  586. existsById(session, stationId, cb) {
  587. async.waterfall(
  588. [
  589. next => {
  590. StationsModule.runJob("GET_STATION", { stationId }, this)
  591. .then(station => next(null, station))
  592. .catch(next);
  593. },
  594. (station, next) => {
  595. if (!station) return next(null, false);
  596. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  597. .then(exists => next(null, exists))
  598. .catch(next);
  599. }
  600. ],
  601. async (err, exists) => {
  602. if (err) {
  603. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  604. this.log(
  605. "ERROR",
  606. "STATION_EXISTS_BY_ID",
  607. `Checking if station "${stationId}" exists failed. "${err}"`
  608. );
  609. return cb({ status: "error", message: err });
  610. }
  611. this.log(
  612. "SUCCESS",
  613. "STATION_EXISTS_BY_ID",
  614. `Station "${stationId}" exists successfully.` /* , false */
  615. );
  616. return cb({ status: "success", data: { exists } });
  617. }
  618. );
  619. },
  620. /**
  621. * Gets the official playlist for a station
  622. *
  623. * @param {object} session - user session
  624. * @param {string} stationId - the station id
  625. * @param {Function} cb - callback
  626. */
  627. getPlaylist(session, stationId, cb) {
  628. async.waterfall(
  629. [
  630. next => {
  631. StationsModule.runJob("GET_STATION", { stationId }, this)
  632. .then(station => {
  633. next(null, station);
  634. })
  635. .catch(next);
  636. },
  637. (station, next) => {
  638. StationsModule.runJob(
  639. "CAN_USER_VIEW_STATION",
  640. {
  641. station,
  642. userId: session.userId
  643. },
  644. this
  645. )
  646. .then(canView => {
  647. if (canView) return next(null, station);
  648. return next("Insufficient permissions.");
  649. })
  650. .catch(err => next(err));
  651. },
  652. (station, next) => {
  653. if (!station) return next("Station not found.");
  654. if (station.type !== "official") return next("This is not an official station.");
  655. return next();
  656. },
  657. next => {
  658. CacheModule.runJob(
  659. "HGET",
  660. {
  661. table: "officialPlaylists",
  662. key: stationId
  663. },
  664. this
  665. )
  666. .then(playlist => {
  667. next(null, playlist);
  668. })
  669. .catch(next);
  670. },
  671. (playlist, next) => {
  672. if (!playlist) return next("Playlist not found.");
  673. return next(null, playlist);
  674. }
  675. ],
  676. async (err, playlist) => {
  677. if (err) {
  678. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  679. this.log(
  680. "ERROR",
  681. "STATIONS_GET_PLAYLIST",
  682. `Getting playlist for station "${stationId}" failed. "${err}"`
  683. );
  684. return cb({ status: "error", message: err });
  685. }
  686. this.log(
  687. "SUCCESS",
  688. "STATIONS_GET_PLAYLIST",
  689. `Got playlist for station "${stationId}" successfully.`,
  690. false
  691. );
  692. return cb({ status: "success", data: { songs: playlist.songs } });
  693. }
  694. );
  695. },
  696. /**
  697. * Joins the station by its name
  698. *
  699. * @param {object} session - user session
  700. * @param {string} stationIdentifier - the station name or station id
  701. * @param {Function} cb - callback
  702. */
  703. async join(session, stationIdentifier, cb) {
  704. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  705. async.waterfall(
  706. [
  707. next => {
  708. StationsModule.runJob("GET_STATION_BY_NAME", { stationName: stationIdentifier }, this)
  709. .then(station => next(null, station))
  710. .catch(() =>
  711. // station identifier may be using stationid instead
  712. StationsModule.runJob("GET_STATION", { stationId: stationIdentifier }, this)
  713. .then(station => next(null, station))
  714. .catch(next)
  715. );
  716. },
  717. (station, next) => {
  718. if (!station) return next("Station not found.");
  719. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  720. .then(canView => {
  721. if (!canView) next("Not allowed to join station.");
  722. else next(null, station);
  723. })
  724. .catch(err => next(err));
  725. },
  726. (station, next) => {
  727. WSModule.runJob("SOCKET_JOIN_ROOM", {
  728. socketId: session.socketId,
  729. room: `station.${station._id}`
  730. });
  731. const data = {
  732. _id: station._id,
  733. type: station.type,
  734. currentSong: station.currentSong,
  735. startedAt: station.startedAt,
  736. paused: station.paused,
  737. timePaused: station.timePaused,
  738. pausedAt: station.pausedAt,
  739. description: station.description,
  740. displayName: station.displayName,
  741. name: station.name,
  742. privacy: station.privacy,
  743. requests: station.requests,
  744. autofill: station.autofill,
  745. owner: station.owner,
  746. blacklist: station.blacklist,
  747. theme: station.theme,
  748. djs: station.djs
  749. };
  750. StationsModule.userList[session.socketId] = station._id;
  751. next(null, data);
  752. },
  753. (data, next) => {
  754. userModel.find({ _id: { $in: data.djs } }, (err, users) => {
  755. if (err) next(err);
  756. else {
  757. data.djs = users.map(user => {
  758. const { _id, name, username, avatar } = user._doc;
  759. return { _id, name, username, avatar };
  760. });
  761. next(null, data);
  762. }
  763. });
  764. },
  765. (data, next) => {
  766. data = JSON.parse(JSON.stringify(data));
  767. data.userCount = StationsModule.usersPerStationCount[data._id] || 0;
  768. data.users = StationsModule.usersPerStation[data._id] || [];
  769. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  770. WSModule.runJob("SOCKET_JOIN_SONG_ROOM", {
  771. socketId: session.socketId,
  772. room: `song.${data.currentSong.youtubeId}`
  773. });
  774. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  775. return next(null, data);
  776. },
  777. (data, next) => {
  778. // only relevant if user logged in
  779. if (session.userId) {
  780. return StationsModule.runJob(
  781. "HAS_USER_FAVORITED_STATION",
  782. {
  783. userId: session.userId,
  784. stationId: data._id
  785. },
  786. this
  787. )
  788. .then(isStationFavorited => {
  789. data.isFavorited = isStationFavorited;
  790. return next(null, data);
  791. })
  792. .catch(err => next(err));
  793. }
  794. return next(null, data);
  795. }
  796. ],
  797. async (err, data) => {
  798. if (err) {
  799. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  800. this.log("ERROR", "STATIONS_JOIN", `Joining station "${stationIdentifier}" failed. "${err}"`);
  801. return cb({ status: "error", message: err });
  802. }
  803. this.log("SUCCESS", "STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  804. return cb({ status: "success", data });
  805. }
  806. );
  807. },
  808. /**
  809. * Gets a station by id
  810. *
  811. * @param {object} session - user session
  812. * @param {string} stationId - the station id
  813. * @param {Function} cb - callback
  814. */
  815. getStationById(session, stationId, cb) {
  816. async.waterfall(
  817. [
  818. next => {
  819. StationsModule.runJob("GET_STATION", { stationId }, this)
  820. .then(station => {
  821. next(null, station);
  822. })
  823. .catch(next);
  824. },
  825. (station, next) => {
  826. if (!station) return next("Station not found.");
  827. return StationsModule.runJob(
  828. "CAN_USER_VIEW_STATION",
  829. {
  830. station,
  831. userId: session.userId
  832. },
  833. this
  834. )
  835. .then(canView => {
  836. if (!canView) next("Not allowed to get station.");
  837. else next(null, station);
  838. })
  839. .catch(err => next(err));
  840. },
  841. (station, next) => {
  842. // only relevant if user logged in
  843. if (session.userId) {
  844. return StationsModule.runJob(
  845. "HAS_USER_FAVORITED_STATION",
  846. {
  847. userId: session.userId,
  848. stationId
  849. },
  850. this
  851. )
  852. .then(isStationFavorited => {
  853. station.isFavorited = isStationFavorited;
  854. return next(null, station);
  855. })
  856. .catch(err => next(err));
  857. }
  858. return next(null, station);
  859. },
  860. (station, next) => {
  861. const data = {
  862. _id: station._id,
  863. type: station.type,
  864. description: station.description,
  865. displayName: station.displayName,
  866. name: station.name,
  867. privacy: station.privacy,
  868. requests: station.requests,
  869. autofill: station.autofill,
  870. owner: station.owner,
  871. theme: station.theme,
  872. paused: station.paused,
  873. currentSong: station.currentSong,
  874. isFavorited: station.isFavorited,
  875. djs: station.djs
  876. };
  877. next(null, data);
  878. }
  879. ],
  880. async (err, data) => {
  881. if (err) {
  882. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  883. this.log("ERROR", "GET_STATION_BY_ID", `Getting station "${stationId}" failed. "${err}"`);
  884. return cb({ status: "error", message: err });
  885. }
  886. this.log("SUCCESS", "GET_STATION_BY_ID", `Got station "${stationId}" successfully.`);
  887. return cb({ status: "success", data: { station: data } });
  888. }
  889. );
  890. },
  891. getStationAutofillPlaylistsById(session, stationId, cb) {
  892. async.waterfall(
  893. [
  894. next => {
  895. StationsModule.runJob("GET_STATION", { stationId }, this)
  896. .then(station => {
  897. next(null, station);
  898. })
  899. .catch(next);
  900. },
  901. (station, next) => {
  902. if (!station) return next("Station not found.");
  903. return StationsModule.runJob(
  904. "CAN_USER_VIEW_STATION",
  905. {
  906. station,
  907. userId: session.userId
  908. },
  909. this
  910. )
  911. .then(canView => {
  912. if (!canView) next("Not allowed to get station.");
  913. else next(null, station);
  914. })
  915. .catch(err => next(err));
  916. },
  917. (station, next) => {
  918. const playlists = [];
  919. async.eachLimit(
  920. station.autofill.playlists,
  921. 1,
  922. (playlistId, next) => {
  923. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  924. .then(playlist => {
  925. playlists.push(playlist);
  926. next();
  927. })
  928. .catch(() => {
  929. playlists.push(null);
  930. next();
  931. });
  932. },
  933. err => {
  934. next(err, playlists);
  935. }
  936. );
  937. }
  938. ],
  939. async (err, playlists) => {
  940. if (err) {
  941. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  942. this.log(
  943. "ERROR",
  944. "GET_STATION_AUTOFILL_PLAYLISTS_BY_ID",
  945. `Getting station "${stationId}"'s autofilling playlists failed. "${err}"`
  946. );
  947. return cb({ status: "error", message: err });
  948. }
  949. this.log(
  950. "SUCCESS",
  951. "GET_STATION_AUTOFILL_PLAYLISTS_BY_ID",
  952. `Got station "${stationId}"'s autofilling playlists successfully.`
  953. );
  954. return cb({ status: "success", data: { playlists } });
  955. }
  956. );
  957. },
  958. getStationBlacklistById(session, stationId, cb) {
  959. async.waterfall(
  960. [
  961. next => {
  962. StationsModule.runJob("GET_STATION", { stationId }, this)
  963. .then(station => {
  964. next(null, station);
  965. })
  966. .catch(next);
  967. },
  968. (station, next) => {
  969. if (!station) return next("Station not found.");
  970. return StationsModule.runJob(
  971. "CAN_USER_VIEW_STATION",
  972. {
  973. station,
  974. userId: session.userId
  975. },
  976. this
  977. )
  978. .then(canView => {
  979. if (!canView) next("Not allowed to get station.");
  980. else next(null, station);
  981. })
  982. .catch(err => next(err));
  983. },
  984. (station, next) => {
  985. const playlists = [];
  986. async.eachLimit(
  987. station.blacklist,
  988. 1,
  989. (playlistId, next) => {
  990. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  991. .then(playlist => {
  992. playlists.push(playlist);
  993. next();
  994. })
  995. .catch(() => {
  996. playlists.push(null);
  997. next();
  998. });
  999. },
  1000. err => {
  1001. next(err, playlists);
  1002. }
  1003. );
  1004. }
  1005. ],
  1006. async (err, playlists) => {
  1007. if (err) {
  1008. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1009. this.log(
  1010. "ERROR",
  1011. "GET_STATION_BLACKLIST_BY_ID",
  1012. `Getting station "${stationId}"'s blacklist failed. "${err}"`
  1013. );
  1014. return cb({ status: "error", message: err });
  1015. }
  1016. this.log(
  1017. "SUCCESS",
  1018. "GET_STATION_BLACKLIST_BY_ID",
  1019. `Got station "${stationId}"'s blacklist successfully.`
  1020. );
  1021. return cb({ status: "success", data: { playlists } });
  1022. }
  1023. );
  1024. },
  1025. /**
  1026. * Toggle votes to skip a station
  1027. *
  1028. * @param session
  1029. * @param stationId - the station id
  1030. * @param cb
  1031. */
  1032. toggleSkipVote: isLoginRequired(async function toggleSkipVote(session, stationId, cb) {
  1033. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1034. async.waterfall(
  1035. [
  1036. next => {
  1037. StationsModule.runJob("GET_STATION", { stationId }, this)
  1038. .then(station => next(null, station))
  1039. .catch(next);
  1040. },
  1041. (station, next) => {
  1042. if (!station) return next("Station not found.");
  1043. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  1044. .then(canView => {
  1045. if (canView) return next(null, station);
  1046. return next("Insufficient permissions.");
  1047. })
  1048. .catch(err => next(err));
  1049. },
  1050. (station, next) => {
  1051. if (!station.currentSong) return next("There is currently no song to skip.");
  1052. const query = {};
  1053. const voted = station.currentSong.skipVotes.indexOf(session.userId) !== -1;
  1054. if (!voted) query.$push = { "currentSong.skipVotes": session.userId };
  1055. else query.$pull = { "currentSong.skipVotes": session.userId };
  1056. return stationModel.updateOne({ _id: stationId }, query, err => {
  1057. if (err) next(err);
  1058. else next(null, !voted);
  1059. });
  1060. },
  1061. (voted, next) => {
  1062. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1063. .then(station => {
  1064. next(null, station, voted);
  1065. })
  1066. .catch(next);
  1067. },
  1068. (station, voted, next) => {
  1069. if (!station) return next("Station not found.");
  1070. return StationsModule.runJob("PROCESS_SKIP_VOTES", { stationId }, this)
  1071. .then(() => next(null, voted))
  1072. .catch(next);
  1073. }
  1074. ],
  1075. async (err, voted) => {
  1076. if (err) {
  1077. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1078. this.log(
  1079. "ERROR",
  1080. "STATIONS_TOGGLE_SKIP_VOTE",
  1081. `Toggling skip vote on "${stationId}" failed. "${err}"`
  1082. );
  1083. return cb({ status: "error", message: err });
  1084. }
  1085. this.log("SUCCESS", "STATIONS_TOGGLE_SKIP_VOTE", `Toggling skip vote on "${stationId}" successful.`);
  1086. CacheModule.runJob("PUB", {
  1087. channel: "station.toggleSkipVote",
  1088. value: { stationId, voted, userId: session.userId }
  1089. });
  1090. return cb({
  1091. status: "success",
  1092. message: voted
  1093. ? "Successfully voted to skip the song."
  1094. : "Successfully removed vote to skip the song.",
  1095. data: { voted }
  1096. });
  1097. }
  1098. );
  1099. }),
  1100. /**
  1101. * Force skips a station
  1102. *
  1103. * @param session
  1104. * @param stationId - the station id
  1105. * @param cb
  1106. */
  1107. forceSkip(session, stationId, cb) {
  1108. async.waterfall(
  1109. [
  1110. next => {
  1111. hasPermission("stations.skip", session, stationId)
  1112. .then(() => next())
  1113. .catch(next);
  1114. },
  1115. next => {
  1116. StationsModule.runJob("GET_STATION", { stationId }, this)
  1117. .then(station => {
  1118. next(null, station);
  1119. })
  1120. .catch(next);
  1121. },
  1122. (station, next) => {
  1123. if (!station) return next("Station not found.");
  1124. return next();
  1125. }
  1126. ],
  1127. async err => {
  1128. if (err) {
  1129. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1130. this.log("ERROR", "STATIONS_FORCE_SKIP", `Force skipping station "${stationId}" failed. "${err}"`);
  1131. return cb({ status: "error", message: err });
  1132. }
  1133. StationsModule.runJob("SKIP_STATION", { stationId, natural: false });
  1134. this.log("SUCCESS", "STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  1135. return cb({
  1136. status: "success",
  1137. message: "Successfully skipped station."
  1138. });
  1139. }
  1140. );
  1141. },
  1142. /**
  1143. * Leaves the user's current station
  1144. *
  1145. * @param {object} session - user session
  1146. * @param {string} stationId - id of station to leave
  1147. * @param {Function} cb - callback
  1148. */
  1149. leave(session, stationId, cb) {
  1150. async.waterfall(
  1151. [
  1152. next => {
  1153. StationsModule.runJob("GET_STATION", { stationId }, this)
  1154. .then(station => next(null, station))
  1155. .catch(next);
  1156. },
  1157. (station, next) => {
  1158. if (!station) return next("Station not found.");
  1159. return next();
  1160. }
  1161. ],
  1162. async (err, userCount) => {
  1163. if (err) {
  1164. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1165. this.log("ERROR", "STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  1166. return cb({ status: "error", message: err });
  1167. }
  1168. this.log("SUCCESS", "STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  1169. WSModule.runJob("SOCKET_LEAVE_ROOM", { socketId: session.socketId, room: `station.${stationId}` });
  1170. WSModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", { sockets: [session.socketId] });
  1171. delete StationsModule.userList[session.socketId];
  1172. return cb({
  1173. status: "success",
  1174. message: "Successfully left station.",
  1175. data: { userCount }
  1176. });
  1177. }
  1178. );
  1179. },
  1180. /**
  1181. * Updates a station's settings
  1182. *
  1183. * @param session
  1184. * @param stationId - the station id
  1185. * @param station - updated station object
  1186. * @param newStation
  1187. * @param cb
  1188. */
  1189. async update(session, stationId, newStation, cb) {
  1190. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1191. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  1192. async.waterfall(
  1193. [
  1194. next => {
  1195. hasPermission("stations.update", session, stationId)
  1196. .then(() => next())
  1197. .catch(next);
  1198. },
  1199. next => {
  1200. stationModel.findOne({ _id: stationId }, next);
  1201. },
  1202. (previousStation, next) => {
  1203. const { name, displayName, description, privacy, requests, autofill, theme } = newStation;
  1204. const { enabled, limit, mode } = autofill;
  1205. // This object makes sure only certain properties can be changed by a user
  1206. const setObject = {
  1207. name,
  1208. displayName,
  1209. description,
  1210. privacy,
  1211. requests,
  1212. "autofill.enabled": enabled,
  1213. "autofill.limit": limit,
  1214. "autofill.mode": mode,
  1215. theme
  1216. };
  1217. stationModel.updateOne({ _id: stationId }, { $set: setObject }, { runValidators: true }, err => {
  1218. next(err, previousStation);
  1219. });
  1220. },
  1221. (previousStation, next) => {
  1222. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1223. .then(station => next(null, station, previousStation))
  1224. .catch(next);
  1225. },
  1226. (station, previousStation, next) => {
  1227. if (
  1228. newStation.autofill.enabled &&
  1229. JSON.stringify(newStation.autofill) !== JSON.stringify(previousStation.autofill)
  1230. )
  1231. StationsModule.runJob("AUTOFILL_STATION", { stationId }, this)
  1232. .then(() => {
  1233. CacheModule.runJob("PUB", {
  1234. channel: "station.queueUpdate",
  1235. value: stationId
  1236. })
  1237. .then(() => next(null, station, previousStation))
  1238. .catch(next);
  1239. })
  1240. .catch(err => {
  1241. if (err === "Autofill is disabled in this station" || err === "Autofill limit reached")
  1242. next(null, station, previousStation);
  1243. else next(err);
  1244. });
  1245. else next(null, station, previousStation);
  1246. },
  1247. (station, previousStation, next) => {
  1248. playlistModel.updateOne(
  1249. { _id: station.playlist },
  1250. { $set: { displayName: `Station - ${station.displayName}` } },
  1251. err => {
  1252. next(err, station, previousStation);
  1253. }
  1254. );
  1255. }
  1256. ],
  1257. async (err, station, previousStation) => {
  1258. if (err) {
  1259. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1260. this.log("ERROR", "STATIONS_UPDATE", `Updating station "${stationId}" failed. "${err}"`);
  1261. return cb({ status: "error", message: err });
  1262. }
  1263. this.log("SUCCESS", "STATIONS_UPDATE", `Updated station "${stationId}" successfully.`);
  1264. CacheModule.runJob("PUB", {
  1265. channel: "station.updated",
  1266. value: { stationId, previousStation }
  1267. });
  1268. return cb({
  1269. status: "success",
  1270. message: "Successfully updated the station."
  1271. });
  1272. }
  1273. );
  1274. },
  1275. /**
  1276. * Pauses a station
  1277. *
  1278. * @param session
  1279. * @param stationId - the station id
  1280. * @param cb
  1281. */
  1282. async pause(session, stationId, cb) {
  1283. const stationModel = await DBModule.runJob(
  1284. "GET_MODEL",
  1285. {
  1286. modelName: "station"
  1287. },
  1288. this
  1289. );
  1290. async.waterfall(
  1291. [
  1292. next => {
  1293. hasPermission("stations.playback.toggle", session, stationId)
  1294. .then(() => next())
  1295. .catch(next);
  1296. },
  1297. next => {
  1298. StationsModule.runJob("GET_STATION", { stationId }, this)
  1299. .then(station => {
  1300. next(null, station);
  1301. })
  1302. .catch(next);
  1303. },
  1304. (station, next) => {
  1305. if (!station) return next("Station not found.");
  1306. if (station.paused) return next("That station was already paused.");
  1307. return stationModel.updateOne(
  1308. { _id: stationId },
  1309. { $set: { paused: true, pausedAt: Date.now() } },
  1310. next
  1311. );
  1312. },
  1313. (res, next) => {
  1314. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1315. .then(() => next())
  1316. .catch(next);
  1317. }
  1318. ],
  1319. async err => {
  1320. if (err) {
  1321. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1322. this.log("ERROR", "STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  1323. return cb({ status: "error", message: err });
  1324. }
  1325. this.log("SUCCESS", "STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  1326. CacheModule.runJob("PUB", {
  1327. channel: "station.pause",
  1328. value: stationId
  1329. });
  1330. NotificationsModule.runJob("UNSCHEDULE", {
  1331. name: `stations.nextSong?id=${stationId}`
  1332. });
  1333. return cb({
  1334. status: "success",
  1335. message: "Successfully paused."
  1336. });
  1337. }
  1338. );
  1339. },
  1340. /**
  1341. * Resumes a station
  1342. *
  1343. * @param session
  1344. * @param stationId - the station id
  1345. * @param cb
  1346. */
  1347. async resume(session, stationId, cb) {
  1348. const stationModel = await DBModule.runJob(
  1349. "GET_MODEL",
  1350. {
  1351. modelName: "station"
  1352. },
  1353. this
  1354. );
  1355. async.waterfall(
  1356. [
  1357. next => {
  1358. hasPermission("stations.playback.toggle", session, stationId)
  1359. .then(() => next())
  1360. .catch(next);
  1361. },
  1362. next => {
  1363. StationsModule.runJob("GET_STATION", { stationId }, this)
  1364. .then(station => {
  1365. next(null, station);
  1366. })
  1367. .catch(next);
  1368. },
  1369. (station, next) => {
  1370. if (!station) return next("Station not found.");
  1371. if (!station.paused) return next("That station is not paused.");
  1372. station.timePaused += Date.now() - station.pausedAt;
  1373. return stationModel.updateOne(
  1374. { _id: stationId },
  1375. {
  1376. $set: { paused: false },
  1377. $inc: { timePaused: Date.now() - station.pausedAt }
  1378. },
  1379. next
  1380. );
  1381. },
  1382. (res, next) => {
  1383. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1384. .then(() => next())
  1385. .catch(next);
  1386. },
  1387. next => {
  1388. StationsModule.runJob("PROCESS_SKIP_VOTES", { stationId }, this)
  1389. .then(() => next())
  1390. .catch(next);
  1391. }
  1392. ],
  1393. async err => {
  1394. if (err) {
  1395. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1396. this.log("ERROR", "STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  1397. return cb({ status: "error", message: err });
  1398. }
  1399. this.log("SUCCESS", "STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  1400. CacheModule.runJob("PUB", {
  1401. channel: "station.resume",
  1402. value: stationId
  1403. });
  1404. return cb({
  1405. status: "success",
  1406. message: "Successfully resumed."
  1407. });
  1408. }
  1409. );
  1410. },
  1411. /**
  1412. * Removes a station
  1413. *
  1414. * @param session
  1415. * @param stationId - the station id
  1416. * @param cb
  1417. */
  1418. async remove(session, stationId, cb) {
  1419. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1420. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  1421. async.waterfall(
  1422. [
  1423. next => {
  1424. hasPermission("stations.remove", session, stationId)
  1425. .then(() => next())
  1426. .catch(next);
  1427. },
  1428. next => {
  1429. stationModel.findById(stationId, (err, station) => {
  1430. if (err) return next(err);
  1431. return next(null, station);
  1432. });
  1433. },
  1434. (station, next) => {
  1435. stationModel.deleteOne({ _id: stationId }, err => next(err, station));
  1436. },
  1437. (station, next) => {
  1438. CacheModule.runJob("HDEL", { table: "stations", key: stationId }, this)
  1439. .then(() => next(null, station))
  1440. .catch(next);
  1441. },
  1442. // remove the playlist for the station
  1443. (station, next) => {
  1444. if (station.playlist)
  1445. PlaylistsModule.runJob("DELETE_PLAYLIST", { playlistId: station.playlist })
  1446. .then(() => {})
  1447. .catch(next);
  1448. next(null, station);
  1449. },
  1450. // remove reference to the station id in any array of a user's favorite stations
  1451. (station, next) => {
  1452. userModel.updateMany(
  1453. { favoriteStations: stationId },
  1454. { $pull: { favoriteStations: stationId } },
  1455. err => next(err, station)
  1456. );
  1457. }
  1458. ],
  1459. async (err, station) => {
  1460. if (err) {
  1461. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1462. this.log("ERROR", "STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  1463. return cb({ status: "error", message: err });
  1464. }
  1465. this.log("SUCCESS", "STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  1466. CacheModule.runJob("PUB", {
  1467. channel: "station.remove",
  1468. value: stationId
  1469. });
  1470. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1471. userId: session.userId,
  1472. type: "station__remove",
  1473. payload: { message: `Removed a station named ${station.displayName}` }
  1474. });
  1475. ActivitiesModule.runJob("REMOVE_ACTIVITY_REFERENCES", { type: "stationId", stationId });
  1476. return cb({
  1477. status: "success",
  1478. message: "Successfully removed."
  1479. });
  1480. }
  1481. );
  1482. },
  1483. /**
  1484. * Create a station
  1485. *
  1486. * @param session
  1487. * @param data - the station data
  1488. * @param cb
  1489. */
  1490. create: isLoginRequired(async function create(session, data, cb) {
  1491. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1492. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  1493. data.name = data.name.toLowerCase();
  1494. let blacklist = [
  1495. "about",
  1496. "support",
  1497. "staff",
  1498. "help",
  1499. "news",
  1500. "terms",
  1501. "privacy",
  1502. "profile",
  1503. "c",
  1504. "community",
  1505. "tos",
  1506. "login",
  1507. "register",
  1508. "p",
  1509. "official",
  1510. "o",
  1511. "faq",
  1512. "team",
  1513. "donate",
  1514. "buy",
  1515. "shop",
  1516. "forums",
  1517. "explore",
  1518. "settings",
  1519. "admin",
  1520. "auth",
  1521. "reset_password",
  1522. "backend",
  1523. "api",
  1524. "songs",
  1525. "playlists",
  1526. "playlist",
  1527. "albums",
  1528. "artists",
  1529. "artist",
  1530. "station"
  1531. ];
  1532. if (data.type === "community" && config.has("blacklistedCommunityStationNames"))
  1533. blacklist = [...blacklist, ...config.get("blacklistedCommunityStationNames")];
  1534. async.waterfall(
  1535. [
  1536. next => {
  1537. if (!data) return next("Invalid data.");
  1538. return next();
  1539. },
  1540. next => {
  1541. stationModel.findOne(
  1542. {
  1543. $or: [{ name: data.name }, { displayName: new RegExp(`^${data.displayName}$`, "i") }]
  1544. },
  1545. next
  1546. );
  1547. },
  1548. (station, next) => {
  1549. this.log(station);
  1550. if (station) return next("A station with that name or display name already exists.");
  1551. if (blacklist.indexOf(data.name) !== -1)
  1552. return next("That name is blacklisted. Please use a different name.");
  1553. if (data.type === "official")
  1554. return hasPermission("stations.create.official", session)
  1555. .then(() => next())
  1556. .catch(() => next("Insufficient permissions."));
  1557. return next();
  1558. },
  1559. next => {
  1560. const stationId = mongoose.Types.ObjectId();
  1561. playlistModel.create(
  1562. {
  1563. displayName: `Station - ${data.name}`,
  1564. songs: [],
  1565. createdBy: data.type === "official" ? "Musare" : session.userId,
  1566. createdFor: `${stationId}`,
  1567. createdAt: Date.now(),
  1568. type: "station"
  1569. },
  1570. (err, playlist) => {
  1571. next(err, playlist, stationId);
  1572. }
  1573. );
  1574. },
  1575. (playlist, stationId, next) => {
  1576. const { name, displayName, description, type } = data;
  1577. if (type === "official") {
  1578. stationModel.create(
  1579. {
  1580. _id: stationId,
  1581. name,
  1582. displayName,
  1583. description,
  1584. playlist: playlist._id,
  1585. type,
  1586. privacy: "private",
  1587. queue: [],
  1588. currentSong: null
  1589. },
  1590. next
  1591. );
  1592. } else {
  1593. stationModel.create(
  1594. {
  1595. _id: stationId,
  1596. name,
  1597. displayName,
  1598. description,
  1599. playlist: playlist._id,
  1600. type,
  1601. privacy: "private",
  1602. owner: session.userId,
  1603. queue: [],
  1604. currentSong: null
  1605. },
  1606. next
  1607. );
  1608. }
  1609. }
  1610. ],
  1611. async (err, station) => {
  1612. if (err) {
  1613. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1614. this.log("ERROR", "STATIONS_CREATE", `Creating station failed. "${err}"`);
  1615. cb({ status: "error", message: err });
  1616. } else {
  1617. this.log("SUCCESS", "STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  1618. CacheModule.runJob("PUB", {
  1619. channel: "station.create",
  1620. value: station._id
  1621. });
  1622. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1623. userId: session.userId,
  1624. type: "station__create",
  1625. payload: {
  1626. message: `Created a station named <stationId>${station.displayName}</stationId>`,
  1627. stationId: station._id
  1628. }
  1629. });
  1630. cb({
  1631. status: "success",
  1632. message: "Successfully created station."
  1633. });
  1634. }
  1635. }
  1636. );
  1637. }),
  1638. /**
  1639. * Adds song to station queue
  1640. *
  1641. * @param session
  1642. * @param stationId - the station id
  1643. * @param youtubeId - the song id
  1644. * @param cb
  1645. */
  1646. addToQueue: isLoginRequired(async function addToQueue(session, stationId, youtubeId, cb) {
  1647. async.waterfall(
  1648. [
  1649. next => {
  1650. StationsModule.runJob("GET_STATION", { stationId }, this)
  1651. .then(station => {
  1652. next(null, station);
  1653. })
  1654. .catch(next);
  1655. },
  1656. (station, next) => {
  1657. if (!station) return next("Station not found.");
  1658. if (!station.requests.enabled) return next("Requests are disabled in this station.");
  1659. if (
  1660. station.requests.access === "owner" ||
  1661. (station.requests.access === "user" && station.privacy === "private")
  1662. ) {
  1663. return hasPermission("stations.request", session, stationId)
  1664. .then(() => next(null, station))
  1665. .catch(() => next("You do not have permission to add songs to queue."));
  1666. }
  1667. return next(null, station);
  1668. },
  1669. (station, next) =>
  1670. StationsModule.runJob(
  1671. "CAN_USER_VIEW_STATION",
  1672. {
  1673. station,
  1674. userId: session.userId
  1675. },
  1676. this
  1677. )
  1678. .then(canView => {
  1679. if (canView) return next();
  1680. return next("Insufficient permissions.");
  1681. })
  1682. .catch(err => next(err)),
  1683. next =>
  1684. StationsModule.runJob(
  1685. "ADD_TO_QUEUE",
  1686. {
  1687. stationId,
  1688. youtubeId,
  1689. requestUser: session.userId
  1690. },
  1691. this
  1692. )
  1693. .then(() => next())
  1694. .catch(next)
  1695. ],
  1696. async err => {
  1697. if (err) {
  1698. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1699. this.log(
  1700. "ERROR",
  1701. "STATIONS_ADD_SONG_TO_QUEUE",
  1702. `Adding song "${youtubeId}" to station "${stationId}" queue failed. "${err}"`
  1703. );
  1704. return cb({ status: "error", message: err });
  1705. }
  1706. this.log(
  1707. "SUCCESS",
  1708. "STATIONS_ADD_SONG_TO_QUEUE",
  1709. `Added song "${youtubeId}" to station "${stationId}" successfully.`
  1710. );
  1711. return cb({
  1712. status: "success",
  1713. message: "Successfully added song to queue."
  1714. });
  1715. }
  1716. );
  1717. }),
  1718. /**
  1719. * Removes song from station queue
  1720. *
  1721. * @param session
  1722. * @param stationId - the station id
  1723. * @param youtubeId - the youtube id
  1724. * @param cb
  1725. */
  1726. async removeFromQueue(session, stationId, youtubeId, cb) {
  1727. async.waterfall(
  1728. [
  1729. next => {
  1730. hasPermission("stations.queue.remove", session, stationId)
  1731. .then(() => next())
  1732. .catch(next);
  1733. },
  1734. next => {
  1735. if (!youtubeId) return next("Invalid youtube id.");
  1736. return StationsModule.runJob("REMOVE_FROM_QUEUE", { stationId, youtubeId }, this)
  1737. .then(() => next())
  1738. .catch(next);
  1739. }
  1740. ],
  1741. async err => {
  1742. if (err) {
  1743. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1744. this.log(
  1745. "ERROR",
  1746. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1747. `Removing song "${youtubeId}" from station "${stationId}" queue failed. "${err}"`
  1748. );
  1749. return cb({ status: "error", message: err });
  1750. }
  1751. this.log(
  1752. "SUCCESS",
  1753. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1754. `Removed song "${youtubeId}" from station "${stationId}" successfully.`
  1755. );
  1756. return cb({
  1757. status: "success",
  1758. message: "Successfully removed song from queue."
  1759. });
  1760. }
  1761. );
  1762. },
  1763. /**
  1764. * Gets the queue from a station
  1765. *
  1766. * @param {object} session - user session
  1767. * @param {string} stationId - the station id
  1768. * @param {Function} cb - callback
  1769. */
  1770. getQueue(session, stationId, cb) {
  1771. async.waterfall(
  1772. [
  1773. next => {
  1774. StationsModule.runJob("GET_STATION", { stationId }, this)
  1775. .then(station => next(null, station))
  1776. .catch(next);
  1777. },
  1778. (station, next) => {
  1779. if (!station) return next("Station not found.");
  1780. return next(null, station);
  1781. },
  1782. (station, next) => {
  1783. StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  1784. .then(canView => {
  1785. if (canView) return next(null, station);
  1786. return next("Insufficient permissions.");
  1787. })
  1788. .catch(err => next(err));
  1789. },
  1790. (station, next) => next(null, station.queue)
  1791. ],
  1792. async (err, queue) => {
  1793. if (err) {
  1794. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1795. this.log(
  1796. "ERROR",
  1797. "STATIONS_GET_QUEUE",
  1798. `Getting queue for station "${stationId}" failed. "${err}"`
  1799. );
  1800. return cb({ status: "error", message: err });
  1801. }
  1802. this.log("SUCCESS", "STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  1803. return cb({
  1804. status: "success",
  1805. message: "Successfully got queue.",
  1806. data: { queue }
  1807. });
  1808. }
  1809. );
  1810. },
  1811. /**
  1812. * Reposition a song in station queue
  1813. *
  1814. * @param {object} session - user session
  1815. * @param {object} song - contains details about the song that is to be repositioned
  1816. * @param {string} song.youtubeId - the youtube id of the song
  1817. * @param {number} song.newIndex - the new position for the song in the queue
  1818. * @param {number} song.oldIndex - the old position of the song in the queue
  1819. * @param {string} stationId - the station id
  1820. * @param {Function} cb - callback
  1821. */
  1822. async repositionSongInQueue(session, stationId, song, cb) {
  1823. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1824. async.waterfall(
  1825. [
  1826. next => {
  1827. hasPermission("stations.queue.reposition", session, stationId)
  1828. .then(() => next())
  1829. .catch(next);
  1830. },
  1831. next => {
  1832. if (!song || !song.youtubeId) return next("You must provide a song to reposition.");
  1833. return next();
  1834. },
  1835. // remove song from queue
  1836. next => {
  1837. stationModel.updateOne(
  1838. { _id: stationId },
  1839. { $pull: { queue: { youtubeId: song.youtubeId } } },
  1840. next
  1841. );
  1842. },
  1843. // add song back to queue (in new position)
  1844. (res, next) => {
  1845. stationModel.updateOne(
  1846. { _id: stationId },
  1847. { $push: { queue: { $each: [song], $position: song.newIndex } } },
  1848. err => next(err)
  1849. );
  1850. },
  1851. // update the cache representation of the station
  1852. next => {
  1853. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1854. .then(station => next(null, station))
  1855. .catch(next);
  1856. }
  1857. ],
  1858. async err => {
  1859. if (err) {
  1860. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1861. this.log(
  1862. "ERROR",
  1863. "STATIONS_REPOSITION_SONG_IN_QUEUE",
  1864. `Repositioning song ${song.youtubeId} in queue of station "${stationId}" failed. "${err}"`
  1865. );
  1866. return cb({ status: "error", message: err });
  1867. }
  1868. this.log(
  1869. "SUCCESS",
  1870. "STATIONS_REPOSITION_SONG_IN_QUEUE",
  1871. `Repositioned song ${song.youtubeId} in queue of station "${stationId}" successfully.`
  1872. );
  1873. CacheModule.runJob("PUB", {
  1874. channel: "station.repositionSongInQueue",
  1875. value: {
  1876. song: {
  1877. youtubeId: song.youtubeId,
  1878. oldIndex: song.oldIndex,
  1879. newIndex: song.newIndex
  1880. },
  1881. stationId
  1882. }
  1883. });
  1884. return cb({
  1885. status: "success",
  1886. message: "Successfully repositioned song in queue."
  1887. });
  1888. }
  1889. );
  1890. },
  1891. /**
  1892. * Autofill a playlist in a station
  1893. *
  1894. * @param session
  1895. * @param stationId - the station id
  1896. * @param playlistId - the playlist id
  1897. * @param cb
  1898. */
  1899. async autofillPlaylist(session, stationId, playlistId, cb) {
  1900. async.waterfall(
  1901. [
  1902. next => {
  1903. hasPermission("stations.autofill", session, stationId)
  1904. .then(() => next())
  1905. .catch(next);
  1906. },
  1907. next => {
  1908. StationsModule.runJob("GET_STATION", { stationId }, this)
  1909. .then(station => next(null, station))
  1910. .catch(next);
  1911. },
  1912. (station, next) => {
  1913. if (!station) return next("Station not found.");
  1914. if (station.autofill.playlists.indexOf(playlistId) !== -1)
  1915. return next("That playlist is already autofilling.");
  1916. if (station.autofill.mode === "sequential" && station.autofill.playlists.length > 0)
  1917. return next("Error: Only 1 playlist can be autofilling in sequential mode.");
  1918. return next();
  1919. },
  1920. next => {
  1921. StationsModule.runJob("AUTOFILL_PLAYLIST", { stationId, playlistId }, this)
  1922. .then(() => {
  1923. next();
  1924. })
  1925. .catch(next);
  1926. }
  1927. ],
  1928. async err => {
  1929. if (err) {
  1930. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1931. this.log(
  1932. "ERROR",
  1933. "STATIONS_AUTOFILL_PLAYLIST",
  1934. `Including playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  1935. );
  1936. return cb({ status: "error", message: err });
  1937. }
  1938. this.log(
  1939. "SUCCESS",
  1940. "STATIONS_AUTOFILL_PLAYLIST",
  1941. `Including playlist "${playlistId}" for station "${stationId}" successfully.`
  1942. );
  1943. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  1944. CacheModule.runJob("PUB", {
  1945. channel: "station.autofillPlaylist",
  1946. value: {
  1947. playlistId,
  1948. stationId
  1949. }
  1950. });
  1951. return cb({
  1952. status: "success",
  1953. message: "Successfully added autofill playlist."
  1954. });
  1955. }
  1956. );
  1957. },
  1958. /**
  1959. * Remove autofilled playlist from a station
  1960. *
  1961. * @param session
  1962. * @param stationId - the station id
  1963. * @param playlistId - the playlist id
  1964. * @param cb
  1965. */
  1966. async removeAutofillPlaylist(session, stationId, playlistId, cb) {
  1967. async.waterfall(
  1968. [
  1969. next => {
  1970. hasPermission("stations.autofill", session, stationId)
  1971. .then(() => next())
  1972. .catch(next);
  1973. },
  1974. next => {
  1975. StationsModule.runJob("GET_STATION", { stationId }, this)
  1976. .then(station => next(null, station))
  1977. .catch(next);
  1978. },
  1979. (station, next) => {
  1980. if (!station) return next("Station not found.");
  1981. if (station.autofill.playlists.indexOf(playlistId) === -1)
  1982. return next("That playlist is not autofilling.");
  1983. return next();
  1984. },
  1985. next => {
  1986. StationsModule.runJob("REMOVE_AUTOFILL_PLAYLIST", { stationId, playlistId }, this)
  1987. .then(() => {
  1988. next();
  1989. })
  1990. .catch(next);
  1991. }
  1992. ],
  1993. async err => {
  1994. if (err) {
  1995. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1996. this.log(
  1997. "ERROR",
  1998. "STATIONS_REMOVE_AUTOFILL_PLAYLIST",
  1999. `Removing autofill playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2000. );
  2001. return cb({ status: "error", message: err });
  2002. }
  2003. this.log(
  2004. "SUCCESS",
  2005. "STATIONS_REMOVE_AUTOFILL_PLAYLIST",
  2006. `Removing autofill playlist "${playlistId}" for station "${stationId}" successfully.`
  2007. );
  2008. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2009. CacheModule.runJob("PUB", {
  2010. channel: "station.removedAutofillPlaylist",
  2011. value: {
  2012. playlistId,
  2013. stationId
  2014. }
  2015. });
  2016. return cb({
  2017. status: "success",
  2018. message: "Successfully removed autofill playlist."
  2019. });
  2020. }
  2021. );
  2022. },
  2023. /**
  2024. * Blacklist a playlist in a station
  2025. *
  2026. * @param session
  2027. * @param stationId - the station id
  2028. * @param playlistId - the playlist id
  2029. * @param cb
  2030. */
  2031. async blacklistPlaylist(session, stationId, playlistId, cb) {
  2032. async.waterfall(
  2033. [
  2034. next => {
  2035. hasPermission("stations.blacklist", session, stationId)
  2036. .then(() => next())
  2037. .catch(next);
  2038. },
  2039. next => {
  2040. StationsModule.runJob("GET_STATION", { stationId }, this)
  2041. .then(station => next(null, station))
  2042. .catch(next);
  2043. },
  2044. (station, next) => {
  2045. if (!station) return next("Station not found.");
  2046. if (station.blacklist.indexOf(playlistId) !== -1)
  2047. return next("That playlist is already blacklisted.");
  2048. return next();
  2049. },
  2050. next => {
  2051. StationsModule.runJob("BLACKLIST_PLAYLIST", { stationId, playlistId }, this)
  2052. .then(() => {
  2053. next();
  2054. })
  2055. .catch(next);
  2056. }
  2057. ],
  2058. async err => {
  2059. if (err) {
  2060. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2061. this.log(
  2062. "ERROR",
  2063. "STATIONS_BLACKLIST_PLAYLIST",
  2064. `Blacklisting playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2065. );
  2066. return cb({ status: "error", message: err });
  2067. }
  2068. this.log(
  2069. "SUCCESS",
  2070. "STATIONS_BLACKLIST_PLAYLIST",
  2071. `Blacklisting playlist "${playlistId}" for station "${stationId}" successfully.`
  2072. );
  2073. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2074. CacheModule.runJob("PUB", {
  2075. channel: "station.blacklistedPlaylist",
  2076. value: {
  2077. playlistId,
  2078. stationId
  2079. }
  2080. });
  2081. return cb({
  2082. status: "success",
  2083. message: "Successfully blacklisted playlist."
  2084. });
  2085. }
  2086. );
  2087. },
  2088. /**
  2089. * Remove blacklisted a playlist from a station
  2090. *
  2091. * @param session
  2092. * @param stationId - the station id
  2093. * @param playlistId - the playlist id
  2094. * @param cb
  2095. */
  2096. async removeBlacklistedPlaylist(session, stationId, playlistId, cb) {
  2097. async.waterfall(
  2098. [
  2099. next => {
  2100. hasPermission("stations.blacklist", session, stationId)
  2101. .then(() => next())
  2102. .catch(next);
  2103. },
  2104. next => {
  2105. StationsModule.runJob("GET_STATION", { stationId }, this)
  2106. .then(station => next(null, station))
  2107. .catch(next);
  2108. },
  2109. (station, next) => {
  2110. if (!station) return next("Station not found.");
  2111. if (station.blacklist.indexOf(playlistId) === -1) return next("That playlist is not blacklisted.");
  2112. return next();
  2113. },
  2114. next => {
  2115. StationsModule.runJob("REMOVE_BLACKLISTED_PLAYLIST", { stationId, playlistId }, this)
  2116. .then(() => {
  2117. next();
  2118. })
  2119. .catch(next);
  2120. }
  2121. ],
  2122. async err => {
  2123. if (err) {
  2124. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2125. this.log(
  2126. "ERROR",
  2127. "STATIONS_REMOVE_BLACKLISTED_PLAYLIST",
  2128. `Removing blacklisted playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2129. );
  2130. return cb({ status: "error", message: err });
  2131. }
  2132. this.log(
  2133. "SUCCESS",
  2134. "STATIONS_REMOVE_BLACKLISTED_PLAYLIST",
  2135. `Removing blacklisted playlist "${playlistId}" for station "${stationId}" successfully.`
  2136. );
  2137. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2138. CacheModule.runJob("PUB", {
  2139. channel: "station.removedBlacklistedPlaylist",
  2140. value: {
  2141. playlistId,
  2142. stationId
  2143. }
  2144. });
  2145. return cb({
  2146. status: "success",
  2147. message: "Successfully removed blacklisted playlist."
  2148. });
  2149. }
  2150. );
  2151. },
  2152. favoriteStation: isLoginRequired(async function favoriteStation(session, stationId, cb) {
  2153. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2154. async.waterfall(
  2155. [
  2156. next => {
  2157. StationsModule.runJob("GET_STATION", { stationId }, this)
  2158. .then(station => next(null, station))
  2159. .catch(next);
  2160. },
  2161. (station, next) => {
  2162. if (!station) return next("Station not found.");
  2163. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  2164. .then(canView => {
  2165. if (canView) return next(null, station);
  2166. return next("Insufficient permissions.");
  2167. })
  2168. .catch(err => next(err));
  2169. },
  2170. (station, next) => {
  2171. userModel.updateOne(
  2172. { _id: session.userId },
  2173. { $addToSet: { favoriteStations: stationId } },
  2174. (err, res) => next(err, station, res)
  2175. );
  2176. },
  2177. (station, res, next) => {
  2178. if (res.nModified === 0) return next("The station was already favorited.");
  2179. return next(null, station);
  2180. }
  2181. ],
  2182. async (err, station) => {
  2183. if (err) {
  2184. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2185. this.log("ERROR", "FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  2186. return cb({ status: "error", message: err });
  2187. }
  2188. this.log("SUCCESS", "FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  2189. CacheModule.runJob("PUB", {
  2190. channel: "user.favoritedStation",
  2191. value: {
  2192. userId: session.userId,
  2193. stationId
  2194. }
  2195. });
  2196. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2197. userId: session.userId,
  2198. type: "station__favorite",
  2199. payload: {
  2200. message: `Favorited station <stationId>${station.displayName}</stationId>`,
  2201. stationId
  2202. }
  2203. });
  2204. return cb({
  2205. status: "success",
  2206. message: "Succesfully favorited station."
  2207. });
  2208. }
  2209. );
  2210. }),
  2211. unfavoriteStation: isLoginRequired(async function unfavoriteStation(session, stationId, cb) {
  2212. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2213. async.waterfall(
  2214. [
  2215. next => {
  2216. userModel.updateOne({ _id: session.userId }, { $pull: { favoriteStations: stationId } }, next);
  2217. },
  2218. (res, next) => {
  2219. if (res.nModified === 0) return next("The station wasn't favorited.");
  2220. return next();
  2221. },
  2222. next => {
  2223. StationsModule.runJob("GET_STATION", { stationId }, this)
  2224. .then(station => next(null, station))
  2225. .catch(next);
  2226. }
  2227. ],
  2228. async (err, station) => {
  2229. if (err) {
  2230. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2231. this.log("ERROR", "UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  2232. return cb({ status: "error", message: err });
  2233. }
  2234. this.log("SUCCESS", "UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  2235. CacheModule.runJob("PUB", {
  2236. channel: "user.unfavoritedStation",
  2237. value: {
  2238. userId: session.userId,
  2239. stationId
  2240. }
  2241. });
  2242. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2243. userId: session.userId,
  2244. type: "station__unfavorite",
  2245. payload: {
  2246. message: `Unfavorited station <stationId>${station.displayName}</stationId>`,
  2247. stationId
  2248. }
  2249. });
  2250. return cb({
  2251. status: "success",
  2252. message: "Succesfully unfavorited station."
  2253. });
  2254. }
  2255. );
  2256. }),
  2257. /**
  2258. * Clears every station queue
  2259. *
  2260. * @param {object} session - the session object automatically added by socket.io
  2261. * @param {Function} cb - gets called with the result
  2262. */
  2263. clearEveryStationQueue: useHasPermission(
  2264. "stations.clearEveryStationQueue",
  2265. async function clearEveryStationQueue(session, cb) {
  2266. this.keepLongJob();
  2267. this.publishProgress({
  2268. status: "started",
  2269. title: "Clear every station queue",
  2270. message: "Clearing every station queue.",
  2271. id: this.toString()
  2272. });
  2273. await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
  2274. await CacheModule.runJob(
  2275. "PUB",
  2276. {
  2277. channel: "longJob.added",
  2278. value: { jobId: this.toString(), userId: session.userId }
  2279. },
  2280. this
  2281. );
  2282. async.waterfall(
  2283. [
  2284. next => {
  2285. StationsModule.runJob("CLEAR_EVERY_STATION_QUEUE", {}, this)
  2286. .then(() => next())
  2287. .catch(next);
  2288. }
  2289. ],
  2290. async err => {
  2291. if (err) {
  2292. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2293. this.log("ERROR", "CLEAR_EVERY_STATION_QUEUE", `Clearing every station queue failed. "${err}"`);
  2294. this.publishProgress({
  2295. status: "error",
  2296. message: err
  2297. });
  2298. return cb({ status: "error", message: err });
  2299. }
  2300. this.log("SUCCESS", "CLEAR_EVERY_STATION_QUEUE", "Clearing every station queue was successful.");
  2301. this.publishProgress({
  2302. status: "success",
  2303. message: "Successfully cleared every station queue."
  2304. });
  2305. return cb({ status: "success", message: "Successfully cleared every station queue." });
  2306. }
  2307. );
  2308. }
  2309. ),
  2310. /**
  2311. * Reset a station queue
  2312. *
  2313. * @param {object} session - the session object automatically added by socket.io
  2314. * @param {string} stationId - the station id
  2315. * @param {Function} cb - gets called with the result
  2316. */
  2317. async resetQueue(session, stationId, cb) {
  2318. async.waterfall(
  2319. [
  2320. next => {
  2321. hasPermission("stations.queue.reset", session, stationId)
  2322. .then(() => next())
  2323. .catch(next);
  2324. },
  2325. next => {
  2326. StationsModule.runJob("RESET_QUEUE", { stationId }, this)
  2327. .then(() => next())
  2328. .catch(next);
  2329. }
  2330. ],
  2331. async err => {
  2332. if (err) {
  2333. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2334. this.log("ERROR", "RESET_QUEUE", `Resetting station queue failed. "${err}"`);
  2335. return cb({ status: "error", message: err });
  2336. }
  2337. this.log("SUCCESS", "RESET_QUEUE", "Resetting station queue was successful.");
  2338. return cb({ status: "success", message: "Successfully reset station queue." });
  2339. }
  2340. );
  2341. },
  2342. /**
  2343. * Gets skip votes for a station
  2344. *
  2345. * @param session
  2346. * @param stationId - the station id
  2347. * @param stationId - the song id to get skipvotes for
  2348. * @param cb
  2349. */
  2350. getSkipVotes: isLoginRequired(async function getSkipVotes(session, stationId, songId, cb) {
  2351. async.waterfall(
  2352. [
  2353. next => {
  2354. StationsModule.runJob("GET_STATION", { stationId }, this)
  2355. .then(res => next(null, res.currentSong))
  2356. .catch(console.log);
  2357. },
  2358. (currentSong, next) => {
  2359. if (currentSong && currentSong._id === songId)
  2360. next(null, {
  2361. skipVotes: currentSong.skipVotes.length,
  2362. skipVotesCurrent: true,
  2363. voted: currentSong.skipVotes.indexOf(session.userId) !== -1
  2364. });
  2365. else
  2366. next(null, {
  2367. skipVotes: 0,
  2368. skipVotesCurrent: false,
  2369. voted: false
  2370. });
  2371. }
  2372. ],
  2373. async (err, data) => {
  2374. if (err) {
  2375. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2376. this.log(
  2377. "ERROR",
  2378. "STATIONS_GET_SKIP_VOTES",
  2379. `User "${session.userId}" failed to get skip votes for ${stationId}. "${err}"`
  2380. );
  2381. return cb({ status: "error", message: err });
  2382. }
  2383. return cb({
  2384. status: "success",
  2385. data
  2386. });
  2387. }
  2388. );
  2389. }),
  2390. /**
  2391. * Add DJ to station
  2392. *
  2393. * @param {object} session - the session object automatically added by socket.io
  2394. * @param {string} stationId - the station id
  2395. * @param {string} userId - the dj user id
  2396. * @param {Function} cb - gets called with the result
  2397. */
  2398. async addDj(session, stationId, userId, cb) {
  2399. async.waterfall(
  2400. [
  2401. next => {
  2402. hasPermission("stations.djs.add", session, stationId)
  2403. .then(() => next())
  2404. .catch(next);
  2405. },
  2406. next => {
  2407. StationsModule.runJob("ADD_DJ", { stationId, userId }, this)
  2408. .then(() => next())
  2409. .catch(next);
  2410. }
  2411. ],
  2412. async err => {
  2413. if (err) {
  2414. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2415. this.log("ERROR", "ADD_DJ", `Adding DJ failed. "${err}"`);
  2416. return cb({ status: "error", message: err });
  2417. }
  2418. this.log("SUCCESS", "ADD_DJ", "Adding DJ was successful.");
  2419. return cb({ status: "success", message: "Successfully added DJ." });
  2420. }
  2421. );
  2422. },
  2423. /**
  2424. * Remove DJ from station
  2425. *
  2426. * @param {object} session - the session object automatically added by socket.io
  2427. * @param {string} stationId - the station id
  2428. * @param {string} userId - the dj user id
  2429. * @param {Function} cb - gets called with the result
  2430. */
  2431. async removeDj(session, stationId, userId, cb) {
  2432. async.waterfall(
  2433. [
  2434. next => {
  2435. hasPermission("stations.djs.remove", session, stationId)
  2436. .then(() => next())
  2437. .catch(next);
  2438. },
  2439. next => {
  2440. StationsModule.runJob("REMOVE_DJ", { stationId, userId }, this)
  2441. .then(() => next())
  2442. .catch(next);
  2443. }
  2444. ],
  2445. async err => {
  2446. if (err) {
  2447. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2448. this.log("ERROR", "REMOVE_DJ", `Removing DJ failed. "${err}"`);
  2449. return cb({ status: "error", message: err });
  2450. }
  2451. this.log("SUCCESS", "REMOVE_DJ", "Removing DJ was successful.");
  2452. return cb({ status: "success", message: "Successfully removed DJ." });
  2453. }
  2454. );
  2455. }
  2456. };