stations.js 72 KB


  1. import async from "async";
  2. import mongoose from "mongoose";
  3. import config from "config";
  4. import { isLoginRequired, isOwnerRequired, isAdminRequired } from "./hooks";
  5. import moduleManager from "../../index";
  6. const DBModule = moduleManager.modules.db;
  7. const UtilsModule = moduleManager.modules.utils;
  8. const WSModule = moduleManager.modules.ws;
  9. const SongsModule = moduleManager.modules.songs;
  10. const PlaylistsModule = moduleManager.modules.playlists;
  11. const CacheModule = moduleManager.modules.cache;
  12. const NotificationsModule = moduleManager.modules.notifications;
  13. const StationsModule = moduleManager.modules.stations;
  14. const ActivitiesModule = moduleManager.modules.activities;
  15. CacheModule.runJob("SUB", {
  16. channel: "station.updateUsers",
  17. cb: ({ stationId, usersPerStation }) => {
  18. WSModule.runJob("EMIT_TO_ROOM", {
  19. room: `station.${stationId}`,
  20. args: ["event:station.users.updated", { data: { users: usersPerStation } }]
  21. });
  22. }
  23. });
  24. CacheModule.runJob("SUB", {
  25. channel: "station.updateUserCount",
  26. cb: ({ stationId, usersPerStationCount }) => {
  27. const count = usersPerStationCount || 0;
  28. WSModule.runJob("EMIT_TO_ROOM", {
  29. room: `station.${stationId}`,
  30. args: ["event:station.userCount.updated", { data: { userCount: count } }]
  31. });
  32. StationsModule.runJob("GET_STATION", { stationId }).then(async station => {
  33. if (station.privacy === "public")
  34. WSModule.runJob("EMIT_TO_ROOM", {
  35. room: "home",
  36. args: ["event:station.userCount.updated", { data: { stationId, userCount: count } }]
  37. });
  38. else {
  39. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  40. room: "home"
  41. });
  42. sockets.forEach(async socketId => {
  43. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  44. if (!socket) return;
  45. const { session } = socket;
  46. if (session.sessionId) {
  47. CacheModule.runJob("HGET", {
  48. table: "sessions",
  49. key: session.sessionId
  50. }).then(session => {
  51. if (session)
  52. DBModule.runJob(
  53. "GET_MODEL",
  54. {
  55. modelName: "user"
  56. },
  57. this
  58. ).then(userModel =>
  59. userModel.findOne({ _id: session.userId }, (err, user) => {
  60. if (user.role === "admin")
  61. socket.dispatch("event:station.userCount.updated", {
  62. data: { stationId, count }
  63. });
  64. else if (station.type === "community" && station.owner === session.userId)
  65. socket.dispatch("event:station.userCount.updated", {
  66. data: { stationId, count }
  67. });
  68. })
  69. );
  70. });
  71. }
  72. });
  73. }
  74. });
  75. }
  76. });
  77. CacheModule.runJob("SUB", {
  78. channel: "station.autofillPlaylist",
  79. cb: data => {
  80. const { stationId, playlistId } = data;
  81. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }).then(playlist =>
  82. WSModule.runJob("EMIT_TO_ROOMS", {
  83. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  84. args: ["event:station.autofillPlaylist", { data: { stationId, playlist } }]
  85. })
  86. );
  87. }
  88. });
  89. CacheModule.runJob("SUB", {
  90. channel: "station.blacklistedPlaylist",
  91. cb: data => {
  92. const { stationId, playlistId } = data;
  93. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }).then(playlist =>
  94. WSModule.runJob("EMIT_TO_ROOMS", {
  95. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  96. args: ["event:station.blacklistedPlaylist", { data: { stationId, playlist } }]
  97. })
  98. );
  99. }
  100. });
  101. CacheModule.runJob("SUB", {
  102. channel: "station.removedAutofillPlaylist",
  103. cb: data => {
  104. const { stationId, playlistId } = data;
  105. WSModule.runJob("EMIT_TO_ROOMS", {
  106. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  107. args: ["event:station.removedAutofillPlaylist", { data: { stationId, playlistId } }]
  108. });
  109. }
  110. });
  111. CacheModule.runJob("SUB", {
  112. channel: "station.removedBlacklistedPlaylist",
  113. cb: data => {
  114. const { stationId, playlistId } = data;
  115. WSModule.runJob("EMIT_TO_ROOMS", {
  116. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  117. args: ["event:station.removedBlacklistedPlaylist", { data: { stationId, playlistId } }]
  118. });
  119. }
  120. });
  121. CacheModule.runJob("SUB", {
  122. channel: "station.pause",
  123. cb: stationId => {
  124. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  125. WSModule.runJob("EMIT_TO_ROOM", {
  126. room: `station.${stationId}`,
  127. args: ["event:station.pause", { data: { pausedAt: station.pausedAt } }]
  128. });
  129. WSModule.runJob("EMIT_TO_ROOM", {
  130. room: `manage-station.${stationId}`,
  131. args: ["event:station.pause", { data: { stationId, pausedAt: station.pausedAt } }]
  132. });
  133. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  134. room: `home`,
  135. station
  136. }).then(response => {
  137. const { socketsThatCan } = response;
  138. socketsThatCan.forEach(socket => {
  139. socket.dispatch("event:station.pause", { data: { stationId } });
  140. });
  141. });
  142. });
  143. }
  144. });
  145. CacheModule.runJob("SUB", {
  146. channel: "station.resume",
  147. cb: stationId => {
  148. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  149. WSModule.runJob("EMIT_TO_ROOM", {
  150. room: `station.${stationId}`,
  151. args: ["event:station.resume", { data: { timePaused: station.timePaused } }]
  152. });
  153. WSModule.runJob("EMIT_TO_ROOM", {
  154. room: `manage-station.${stationId}`,
  155. args: ["event:station.resume", { data: { stationId, timePaused: station.timePaused } }]
  156. });
  157. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  158. room: `home`,
  159. station
  160. })
  161. .then(response => {
  162. const { socketsThatCan } = response;
  163. socketsThatCan.forEach(socket => {
  164. socket.dispatch("event:station.resume", { data: { stationId } });
  165. });
  166. })
  167. .catch(console.log);
  168. });
  169. }
  170. });
  171. CacheModule.runJob("SUB", {
  172. channel: "station.queueUpdate",
  173. cb: stationId => {
  174. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  175. if (!station.currentSong && station.queue.length > 0) {
  176. StationsModule.runJob("INITIALIZE_STATION", {
  177. stationId
  178. }).then();
  179. }
  180. WSModule.runJob("EMIT_TO_ROOM", {
  181. room: `station.${stationId}`,
  182. args: ["event:station.queue.updated", { data: { queue: station.queue } }]
  183. });
  184. WSModule.runJob("EMIT_TO_ROOM", {
  185. room: `manage-station.${stationId}`,
  186. args: ["event:manageStation.queue.updated", { data: { stationId, queue: station.queue } }]
  187. });
  188. });
  189. }
  190. });
  191. CacheModule.runJob("SUB", {
  192. channel: "station.repositionSongInQueue",
  193. cb: res => {
  194. WSModule.runJob("EMIT_TO_ROOM", {
  195. room: `station.${res.stationId}`,
  196. args: ["event:station.queue.song.repositioned", { data: { song: res.song } }]
  197. });
  198. WSModule.runJob("EMIT_TO_ROOM", {
  199. room: `manage-station.${res.stationId}`,
  200. args: [
  201. "event:manageStation.queue.song.repositioned",
  202. { data: { stationId: res.stationId, song: res.song } }
  203. ]
  204. });
  205. }
  206. });
  207. CacheModule.runJob("SUB", {
  208. channel: "station.voteSkipSong",
  209. cb: stationId => {
  210. WSModule.runJob("EMIT_TO_ROOM", {
  211. room: `station.${stationId}`,
  212. args: ["event:station.voteSkipSong"]
  213. });
  214. }
  215. });
  216. CacheModule.runJob("SUB", {
  217. channel: "station.remove",
  218. cb: stationId => {
  219. WSModule.runJob("EMIT_TO_ROOM", {
  220. room: `station.${stationId}`,
  221. args: ["event:station.deleted"]
  222. });
  223. WSModule.runJob("EMIT_TO_ROOM", {
  224. room: `manage-station.${stationId}`,
  225. args: ["event:station.deleted"]
  226. });
  227. WSModule.runJob("EMIT_TO_ROOM", {
  228. room: `home`,
  229. args: ["event:station.deleted", { data: { stationId } }]
  230. });
  231. WSModule.runJob("EMIT_TO_ROOM", {
  232. room: "admin.stations",
  233. args: ["event:admin.station.deleted", { data: { stationId } }]
  234. });
  235. }
  236. });
  237. CacheModule.runJob("SUB", {
  238. channel: "station.create",
  239. cb: async stationId => {
  240. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  241. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then(async res => {
  242. const { station } = res;
  243. station.userCount = StationsModule.usersPerStationCount[stationId] || 0;
  244. WSModule.runJob("EMIT_TO_ROOM", {
  245. room: "admin.stations",
  246. args: ["event:admin.station.created", { data: { station } }]
  247. });
  248. if (station.privacy === "public")
  249. WSModule.runJob("EMIT_TO_ROOM", {
  250. room: "home",
  251. args: ["event:station.created", { data: { station } }]
  252. });
  253. else {
  254. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  255. room: "home"
  256. });
  257. sockets.forEach(async socketId => {
  258. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  259. if (!socket) return;
  260. const { session } = socket;
  261. if (session.sessionId) {
  262. CacheModule.runJob("HGET", {
  263. table: "sessions",
  264. key: session.sessionId
  265. }).then(session => {
  266. if (session) {
  267. userModel.findOne({ _id: session.userId }, (err, user) => {
  268. if (user.role === "admin")
  269. socket.dispatch("event:station.created", { data: { station } });
  270. else if (station.type === "community" && station.owner === session.userId)
  271. socket.dispatch("event:station.created", { data: { station } });
  272. });
  273. }
  274. });
  275. }
  276. });
  277. }
  278. });
  279. }
  280. });
  281. CacheModule.runJob("SUB", {
  282. channel: "station.updated",
  283. cb: async data => {
  284. const stationModel = await DBModule.runJob("GET_MODEL", {
  285. modelName: "station"
  286. });
  287. const { stationId } = data;
  288. stationModel.findOne(
  289. { _id: stationId },
  290. ["_id", "name", "displayName", "description", "type", "privacy", "owner", "requests", "autofill", "theme"],
  291. (err, station) => {
  292. WSModule.runJob("EMIT_TO_ROOMS", {
  293. rooms: [`station.${stationId}`, `manage-station.${stationId}`, "admin.stations"],
  294. args: ["event:station.updated", { data: { station } }]
  295. });
  296. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  297. room: `home`,
  298. station
  299. }).then(response => {
  300. const { socketsThatCan } = response;
  301. socketsThatCan.forEach(socket => {
  302. socket.dispatch("event:station.updated", { data: { station } });
  303. });
  304. });
  305. if (data.previousStation && data.previousStation.privacy !== station.privacy) {
  306. if (station.privacy === "public") {
  307. // Station became public
  308. WSModule.runJob("EMIT_TO_ROOM", {
  309. room: "home",
  310. args: ["event:station.created", { data: { station } }]
  311. });
  312. } else if (data.previousStation.privacy === "public") {
  313. // Station became hidden
  314. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  315. room: `home`,
  316. station
  317. }).then(response => {
  318. const { socketsThatCannot } = response;
  319. socketsThatCannot.forEach(socket => {
  320. socket.dispatch("event:station.deleted", { data: { stationId } });
  321. });
  322. });
  323. }
  324. }
  325. }
  326. );
  327. }
  328. });
  329. export default {
  330. /**
  331. * Get a list of all the stations
  332. *
  333. * @param {object} session - user session
  334. * @param {Function} cb - callback
  335. */
  336. async index(session, cb) {
  337. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  338. async.waterfall(
  339. [
  340. // get array of the ids of the user's favorite stations
  341. next => {
  342. if (session.userId)
  343. return userModel.findById(session.userId).select({ favoriteStations: -1 }).exec(next);
  344. return next(null, { favoriteStations: [] });
  345. },
  346. ({ favoriteStations }, next) => {
  347. CacheModule.runJob("HGETALL", { table: "stations" }, this).then(stations =>
  348. next(null, stations, favoriteStations)
  349. );
  350. },
  351. (stations, favorited, next) => {
  352. const filteredStations = [];
  353. async.eachLimit(
  354. stations,
  355. 1,
  356. (station, nextStation) => {
  357. async.waterfall(
  358. [
  359. callback => {
  360. // only relevant if user logged in
  361. if (session.userId) {
  362. if (favorited.indexOf(station._id) !== -1) station.isFavorited = true;
  363. return callback();
  364. }
  365. return callback();
  366. },
  367. callback => {
  368. StationsModule.runJob(
  369. "CAN_USER_VIEW_STATION",
  370. {
  371. station,
  372. userId: session.userId,
  373. homeView: true
  374. },
  375. this
  376. )
  377. .then(exists => callback(null, exists))
  378. .catch(callback);
  379. }
  380. ],
  381. (err, exists) => {
  382. if (err) return this.log("ERROR", "STATIONS_INDEX", err);
  383. station.userCount = StationsModule.usersPerStationCount[station._id] || 0;
  384. if (exists) filteredStations.push(station);
  385. return nextStation();
  386. }
  387. );
  388. },
  389. () => next(null, filteredStations, favorited)
  390. );
  391. }
  392. ],
  393. async (err, stations, favorited) => {
  394. if (err) {
  395. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  396. this.log("ERROR", "STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  397. return cb({ status: "error", message: err });
  398. }
  399. this.log("SUCCESS", "STATIONS_INDEX", `Indexing stations successful.`, false);
  400. return cb({ status: "success", data: { stations, favorited } });
  401. }
  402. );
  403. },
  404. /**
  405. * Gets stations, used in the admin stations page by the AdvancedTable component
  406. *
  407. * @param {object} session - the session object automatically added by the websocket
  408. * @param page - the page
  409. * @param pageSize - the size per page
  410. * @param properties - the properties to return for each station
  411. * @param sort - the sort object
  412. * @param queries - the queries array
  413. * @param operator - the operator for queries
  414. * @param cb
  415. */
  416. getData: isAdminRequired(async function getSet(session, page, pageSize, properties, sort, queries, operator, cb) {
  417. async.waterfall(
  418. [
  419. next => {
  420. DBModule.runJob(
  421. "GET_DATA",
  422. {
  423. page,
  424. pageSize,
  425. properties,
  426. sort,
  427. queries,
  428. operator,
  429. modelName: "station",
  430. blacklistedProperties: [],
  431. specialProperties: {
  432. owner: [
  433. {
  434. $addFields: {
  435. ownerOID: {
  436. $convert: {
  437. input: "$owner",
  438. to: "objectId",
  439. onError: "unknown",
  440. onNull: "unknown"
  441. }
  442. }
  443. }
  444. },
  445. {
  446. $lookup: {
  447. from: "users",
  448. localField: "ownerOID",
  449. foreignField: "_id",
  450. as: "ownerUser"
  451. }
  452. },
  453. {
  454. $unwind: {
  455. path: "$ownerUser",
  456. preserveNullAndEmptyArrays: true
  457. }
  458. },
  459. {
  460. $addFields: {
  461. ownerUsername: {
  462. $cond: [
  463. { $eq: [{ $type: "$owner" }, "string"] },
  464. { $ifNull: ["$ownerUser.username", "unknown"] },
  465. "none"
  466. ]
  467. }
  468. }
  469. },
  470. {
  471. $project: {
  472. ownerOID: 0,
  473. ownerUser: 0
  474. }
  475. }
  476. ]
  477. },
  478. specialQueries: {
  479. owner: newQuery => ({ $or: [newQuery, { ownerUsername: newQuery.owner }] })
  480. }
  481. },
  482. this
  483. )
  484. .then(response => {
  485. next(null, response);
  486. })
  487. .catch(err => {
  488. next(err);
  489. });
  490. }
  491. ],
  492. async (err, response) => {
  493. if (err) {
  494. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  495. this.log("ERROR", "STATIONS_GET_DATA", `Failed to get data from stations. "${err}"`);
  496. return cb({ status: "error", message: err });
  497. }
  498. this.log("SUCCESS", "STATIONS_GET_DATA", `Got data from stations successfully.`);
  499. return cb({ status: "success", message: "Successfully got data from stations.", data: response });
  500. }
  501. );
  502. }),
  503. /**
  504. * Obtains basic metadata of a station in order to format an activity
  505. *
  506. * @param {object} session - user session
  507. * @param {string} stationId - the station id
  508. * @param {Function} cb - callback
  509. */
  510. getStationForActivity(session, stationId, cb) {
  511. async.waterfall(
  512. [
  513. next => {
  514. StationsModule.runJob("GET_STATION", { stationId }, this)
  515. .then(station => {
  516. next(null, station);
  517. })
  518. .catch(next);
  519. }
  520. ],
  521. async (err, station) => {
  522. if (err) {
  523. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  524. this.log(
  525. "ERROR",
  526. "STATIONS_GET_STATION_FOR_ACTIVITY",
  527. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  528. );
  529. return cb({ status: "error", message: err });
  530. }
  531. this.log(
  532. "SUCCESS",
  533. "STATIONS_GET_STATION_FOR_ACTIVITY",
  534. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  535. );
  536. return cb({
  537. status: "success",
  538. data: {
  539. title: station.displayName,
  540. thumbnail: station.currentSong ? station.currentSong.thumbnail : ""
  541. }
  542. });
  543. }
  544. );
  545. },
  546. /**
  547. * Verifies that a station exists from its name
  548. *
  549. * @param {object} session - user session
  550. * @param {string} stationName - the station name
  551. * @param {Function} cb - callback
  552. */
  553. existsByName(session, stationName, cb) {
  554. async.waterfall(
  555. [
  556. next => {
  557. StationsModule.runJob("GET_STATION_BY_NAME", { stationName }, this)
  558. .then(station => next(null, station))
  559. .catch(next);
  560. },
  561. (station, next) => {
  562. if (!station) return next(null, false);
  563. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  564. .then(exists => next(null, exists))
  565. .catch(next);
  566. }
  567. ],
  568. async (err, exists) => {
  569. if (err) {
  570. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  571. this.log(
  572. "ERROR",
  573. "STATION_EXISTS_BY_NAME",
  574. `Checking if station "${stationName}" exists failed. "${err}"`
  575. );
  576. return cb({ status: "error", message: err });
  577. }
  578. this.log(
  579. "SUCCESS",
  580. "STATION_EXISTS_BY_NAME",
  581. `Station "${stationName}" exists successfully.` /* , false */
  582. );
  583. return cb({ status: "success", data: { exists } });
  584. }
  585. );
  586. },
  587. /**
  588. * Verifies that a station exists from its id
  589. *
  590. * @param {object} session - user session
  591. * @param {string} stationId - the station id
  592. * @param {Function} cb - callback
  593. */
  594. existsById(session, stationId, cb) {
  595. async.waterfall(
  596. [
  597. next => {
  598. StationsModule.runJob("GET_STATION", { stationId }, this)
  599. .then(station => next(null, station))
  600. .catch(next);
  601. },
  602. (station, next) => {
  603. if (!station) return next(null, false);
  604. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  605. .then(exists => next(null, exists))
  606. .catch(next);
  607. }
  608. ],
  609. async (err, exists) => {
  610. if (err) {
  611. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  612. this.log(
  613. "ERROR",
  614. "STATION_EXISTS_BY_ID",
  615. `Checking if station "${stationId}" exists failed. "${err}"`
  616. );
  617. return cb({ status: "error", message: err });
  618. }
  619. this.log(
  620. "SUCCESS",
  621. "STATION_EXISTS_BY_ID",
  622. `Station "${stationId}" exists successfully.` /* , false */
  623. );
  624. return cb({ status: "success", data: { exists } });
  625. }
  626. );
  627. },
  628. /**
  629. * Gets the official playlist for a station
  630. *
  631. * @param {object} session - user session
  632. * @param {string} stationId - the station id
  633. * @param {Function} cb - callback
  634. */
  635. getPlaylist(session, stationId, cb) {
  636. async.waterfall(
  637. [
  638. next => {
  639. StationsModule.runJob("GET_STATION", { stationId }, this)
  640. .then(station => {
  641. next(null, station);
  642. })
  643. .catch(next);
  644. },
  645. (station, next) => {
  646. StationsModule.runJob(
  647. "CAN_USER_VIEW_STATION",
  648. {
  649. station,
  650. userId: session.userId
  651. },
  652. this
  653. )
  654. .then(canView => {
  655. if (canView) return next(null, station);
  656. return next("Insufficient permissions.");
  657. })
  658. .catch(err => next(err));
  659. },
  660. (station, next) => {
  661. if (!station) return next("Station not found.");
  662. if (station.type !== "official") return next("This is not an official station.");
  663. return next();
  664. },
  665. next => {
  666. CacheModule.runJob(
  667. "HGET",
  668. {
  669. table: "officialPlaylists",
  670. key: stationId
  671. },
  672. this
  673. )
  674. .then(playlist => {
  675. next(null, playlist);
  676. })
  677. .catch(next);
  678. },
  679. (playlist, next) => {
  680. if (!playlist) return next("Playlist not found.");
  681. return next(null, playlist);
  682. }
  683. ],
  684. async (err, playlist) => {
  685. if (err) {
  686. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  687. this.log(
  688. "ERROR",
  689. "STATIONS_GET_PLAYLIST",
  690. `Getting playlist for station "${stationId}" failed. "${err}"`
  691. );
  692. return cb({ status: "error", message: err });
  693. }
  694. this.log(
  695. "SUCCESS",
  696. "STATIONS_GET_PLAYLIST",
  697. `Got playlist for station "${stationId}" successfully.`,
  698. false
  699. );
  700. return cb({ status: "success", data: { songs: playlist.songs } });
  701. }
  702. );
  703. },
  704. /**
  705. * Joins the station by its name
  706. *
  707. * @param {object} session - user session
  708. * @param {string} stationIdentifier - the station name or station id
  709. * @param {Function} cb - callback
  710. */
  711. join(session, stationIdentifier, cb) {
  712. async.waterfall(
  713. [
  714. next => {
  715. StationsModule.runJob("GET_STATION_BY_NAME", { stationName: stationIdentifier }, this)
  716. .then(station => next(null, station))
  717. .catch(() =>
  718. // station identifier may be using stationid instead
  719. StationsModule.runJob("GET_STATION", { stationId: stationIdentifier }, this)
  720. .then(station => next(null, station))
  721. .catch(next)
  722. );
  723. },
  724. (station, next) => {
  725. if (!station) return next("Station not found.");
  726. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  727. .then(canView => {
  728. if (!canView) next("Not allowed to join station.");
  729. else next(null, station);
  730. })
  731. .catch(err => next(err));
  732. },
  733. (station, next) => {
  734. WSModule.runJob("SOCKET_JOIN_ROOM", {
  735. socketId: session.socketId,
  736. room: `station.${station._id}`
  737. });
  738. const data = {
  739. _id: station._id,
  740. type: station.type,
  741. currentSong: station.currentSong,
  742. startedAt: station.startedAt,
  743. paused: station.paused,
  744. timePaused: station.timePaused,
  745. pausedAt: station.pausedAt,
  746. description: station.description,
  747. displayName: station.displayName,
  748. name: station.name,
  749. privacy: station.privacy,
  750. requests: station.requests,
  751. autofill: station.autofill,
  752. owner: station.owner,
  753. blacklist: station.blacklist,
  754. theme: station.theme
  755. };
  756. StationsModule.userList[session.socketId] = station._id;
  757. next(null, data);
  758. },
  759. (data, next) => {
  760. data = JSON.parse(JSON.stringify(data));
  761. data.userCount = StationsModule.usersPerStationCount[data._id] || 0;
  762. data.users = StationsModule.usersPerStation[data._id] || [];
  763. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  764. WSModule.runJob("SOCKET_JOIN_SONG_ROOM", {
  765. socketId: session.socketId,
  766. room: `song.${data.currentSong.youtubeId}`
  767. });
  768. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  769. return SongsModule.runJob(
  770. "GET_SONG_FROM_YOUTUBE_ID",
  771. { youtubeId: data.currentSong.youtubeId },
  772. this
  773. )
  774. .then(response => {
  775. const { song } = response;
  776. if (song) {
  777. data.currentSong.likes = song.likes;
  778. data.currentSong.dislikes = song.dislikes;
  779. } else {
  780. data.currentSong.likes = -1;
  781. data.currentSong.dislikes = -1;
  782. }
  783. })
  784. .catch(() => {
  785. data.currentSong.likes = -1;
  786. data.currentSong.dislikes = -1;
  787. })
  788. .finally(() => next(null, data));
  789. },
  790. (data, next) => {
  791. // only relevant if user logged in
  792. if (session.userId) {
  793. return StationsModule.runJob(
  794. "HAS_USER_FAVORITED_STATION",
  795. {
  796. userId: session.userId,
  797. stationId: data._id
  798. },
  799. this
  800. )
  801. .then(isStationFavorited => {
  802. data.isFavorited = isStationFavorited;
  803. return next(null, data);
  804. })
  805. .catch(err => next(err));
  806. }
  807. return next(null, data);
  808. }
  809. ],
  810. async (err, data) => {
  811. if (err) {
  812. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  813. this.log("ERROR", "STATIONS_JOIN", `Joining station "${stationIdentifier}" failed. "${err}"`);
  814. return cb({ status: "error", message: err });
  815. }
  816. this.log("SUCCESS", "STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  817. return cb({ status: "success", data });
  818. }
  819. );
  820. },
  821. /**
  822. * Gets a station by id
  823. *
  824. * @param {object} session - user session
  825. * @param {string} stationId - the station id
  826. * @param {Function} cb - callback
  827. */
  828. getStationById(session, stationId, cb) {
  829. async.waterfall(
  830. [
  831. next => {
  832. StationsModule.runJob("GET_STATION", { stationId }, this)
  833. .then(station => {
  834. next(null, station);
  835. })
  836. .catch(next);
  837. },
  838. (station, next) => {
  839. if (!station) return next("Station not found.");
  840. return StationsModule.runJob(
  841. "CAN_USER_VIEW_STATION",
  842. {
  843. station,
  844. userId: session.userId
  845. },
  846. this
  847. )
  848. .then(canView => {
  849. if (!canView) next("Not allowed to get station.");
  850. else next(null, station);
  851. })
  852. .catch(err => next(err));
  853. },
  854. (station, next) => {
  855. const data = {
  856. _id: station._id,
  857. type: station.type,
  858. description: station.description,
  859. displayName: station.displayName,
  860. name: station.name,
  861. privacy: station.privacy,
  862. requests: station.requests,
  863. autofill: station.autofill,
  864. owner: station.owner,
  865. theme: station.theme,
  866. paused: station.paused,
  867. currentSong: station.currentSong
  868. };
  869. next(null, data);
  870. }
  871. ],
  872. async (err, data) => {
  873. if (err) {
  874. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  875. this.log("ERROR", "GET_STATION_BY_ID", `Getting station "${stationId}" failed. "${err}"`);
  876. return cb({ status: "error", message: err });
  877. }
  878. this.log("SUCCESS", "GET_STATION_BY_ID", `Got station "${stationId}" successfully.`);
  879. return cb({ status: "success", data: { station: data } });
  880. }
  881. );
  882. },
  883. getStationAutofillPlaylistsById(session, stationId, cb) {
  884. async.waterfall(
  885. [
  886. next => {
  887. StationsModule.runJob("GET_STATION", { stationId }, this)
  888. .then(station => {
  889. next(null, station);
  890. })
  891. .catch(next);
  892. },
  893. (station, next) => {
  894. if (!station) return next("Station not found.");
  895. return StationsModule.runJob(
  896. "CAN_USER_VIEW_STATION",
  897. {
  898. station,
  899. userId: session.userId
  900. },
  901. this
  902. )
  903. .then(canView => {
  904. if (!canView) next("Not allowed to get station.");
  905. else next(null, station);
  906. })
  907. .catch(err => next(err));
  908. },
  909. (station, next) => {
  910. const playlists = [];
  911. async.eachLimit(
  912. station.autofill.playlists,
  913. 1,
  914. (playlistId, next) => {
  915. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  916. .then(playlist => {
  917. playlists.push(playlist);
  918. next();
  919. })
  920. .catch(() => {
  921. playlists.push(null);
  922. next();
  923. });
  924. },
  925. err => {
  926. next(err, playlists);
  927. }
  928. );
  929. }
  930. ],
  931. async (err, playlists) => {
  932. if (err) {
  933. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  934. this.log(
  935. "ERROR",
  936. "GET_STATION_AUTOFILL_PLAYLISTS_BY_ID",
  937. `Getting station "${stationId}"'s autofilling playlists failed. "${err}"`
  938. );
  939. return cb({ status: "error", message: err });
  940. }
  941. this.log(
  942. "SUCCESS",
  943. "GET_STATION_AUTOFILL_PLAYLISTS_BY_ID",
  944. `Got station "${stationId}"'s autofilling playlists successfully.`
  945. );
  946. return cb({ status: "success", data: { playlists } });
  947. }
  948. );
  949. },
  950. getStationBlacklistById(session, stationId, cb) {
  951. async.waterfall(
  952. [
  953. next => {
  954. StationsModule.runJob("GET_STATION", { stationId }, this)
  955. .then(station => {
  956. next(null, station);
  957. })
  958. .catch(next);
  959. },
  960. (station, next) => {
  961. if (!station) return next("Station not found.");
  962. return StationsModule.runJob(
  963. "CAN_USER_VIEW_STATION",
  964. {
  965. station,
  966. userId: session.userId
  967. },
  968. this
  969. )
  970. .then(canView => {
  971. if (!canView) next("Not allowed to get station.");
  972. else next(null, station);
  973. })
  974. .catch(err => next(err));
  975. },
  976. (station, next) => {
  977. const playlists = [];
  978. async.eachLimit(
  979. station.blacklist,
  980. 1,
  981. (playlistId, next) => {
  982. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  983. .then(playlist => {
  984. playlists.push(playlist);
  985. next();
  986. })
  987. .catch(() => {
  988. playlists.push(null);
  989. next();
  990. });
  991. },
  992. err => {
  993. next(err, playlists);
  994. }
  995. );
  996. }
  997. ],
  998. async (err, playlists) => {
  999. if (err) {
  1000. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1001. this.log(
  1002. "ERROR",
  1003. "GET_STATION_BLACKLIST_BY_ID",
  1004. `Getting station "${stationId}"'s blacklist failed. "${err}"`
  1005. );
  1006. return cb({ status: "error", message: err });
  1007. }
  1008. this.log(
  1009. "SUCCESS",
  1010. "GET_STATION_BLACKLIST_BY_ID",
  1011. `Got station "${stationId}"'s blacklist successfully.`
  1012. );
  1013. return cb({ status: "success", data: { playlists } });
  1014. }
  1015. );
  1016. },
  1017. /**
  1018. * Votes to skip a station
  1019. *
  1020. * @param session
  1021. * @param stationId - the station id
  1022. * @param cb
  1023. */
  1024. voteSkip: isLoginRequired(async function voteSkip(session, stationId, cb) {
  1025. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1026. let skipVotes = 0;
  1027. let shouldSkip = false;
  1028. async.waterfall(
  1029. [
  1030. next => {
  1031. StationsModule.runJob("GET_STATION", { stationId }, this)
  1032. .then(station => next(null, station))
  1033. .catch(next);
  1034. },
  1035. (station, next) => {
  1036. if (!station) return next("Station not found.");
  1037. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  1038. .then(canView => {
  1039. if (canView) return next(null, station);
  1040. return next("Insufficient permissions.");
  1041. })
  1042. .catch(err => next(err));
  1043. },
  1044. (station, next) => {
  1045. if (!station.currentSong) return next("There is currently no song to skip.");
  1046. if (station.currentSong.skipVotes.indexOf(session.userId) !== -1)
  1047. return next("You have already voted to skip this song.");
  1048. return next(null, station);
  1049. },
  1050. (station, next) => {
  1051. stationModel.updateOne(
  1052. { _id: stationId },
  1053. { $push: { "currentSong.skipVotes": session.userId } },
  1054. next
  1055. );
  1056. },
  1057. (res, next) => {
  1058. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1059. .then(station => {
  1060. next(null, station);
  1061. })
  1062. .catch(next);
  1063. },
  1064. (station, next) => {
  1065. if (!station) return next("Station not found.");
  1066. return next(null, station);
  1067. },
  1068. (station, next) => {
  1069. skipVotes = station.currentSong.skipVotes.length;
  1070. WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: `station.${stationId}` }, this)
  1071. .then(sockets => next(null, sockets))
  1072. .catch(next);
  1073. },
  1074. (sockets, next) => {
  1075. if (sockets.length <= skipVotes) {
  1076. shouldSkip = true;
  1077. return next();
  1078. }
  1079. const users = [];
  1080. return async.each(
  1081. sockets,
  1082. (socketId, next) => {
  1083. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this)
  1084. .then(socket => {
  1085. if (socket && socket.session && socket.session.userId) {
  1086. if (!users.includes(socket.session.userId)) users.push(socket.session.userId);
  1087. }
  1088. return next();
  1089. })
  1090. .catch(next);
  1091. },
  1092. err => {
  1093. if (err) return next(err);
  1094. if (users.length <= skipVotes) shouldSkip = true;
  1095. return next();
  1096. }
  1097. );
  1098. }
  1099. ],
  1100. async err => {
  1101. if (err) {
  1102. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1103. this.log("ERROR", "STATIONS_VOTE_SKIP", `Vote skipping station "${stationId}" failed. "${err}"`);
  1104. return cb({ status: "error", message: err });
  1105. }
  1106. this.log("SUCCESS", "STATIONS_VOTE_SKIP", `Vote skipping "${stationId}" successful.`);
  1107. CacheModule.runJob("PUB", {
  1108. channel: "station.voteSkipSong",
  1109. value: stationId
  1110. });
  1111. if (shouldSkip) {
  1112. StationsModule.runJob("SKIP_STATION", { stationId, natural: false });
  1113. }
  1114. return cb({
  1115. status: "success",
  1116. message: "Successfully voted to skip the song."
  1117. });
  1118. }
  1119. );
  1120. }),
  1121. /**
  1122. * Force skips a station
  1123. *
  1124. * @param session
  1125. * @param stationId - the station id
  1126. * @param cb
  1127. */
  1128. forceSkip: isOwnerRequired(function forceSkip(session, stationId, cb) {
  1129. async.waterfall(
  1130. [
  1131. next => {
  1132. StationsModule.runJob("GET_STATION", { stationId }, this)
  1133. .then(station => {
  1134. next(null, station);
  1135. })
  1136. .catch(next);
  1137. },
  1138. (station, next) => {
  1139. if (!station) return next("Station not found.");
  1140. return next();
  1141. }
  1142. ],
  1143. async err => {
  1144. if (err) {
  1145. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1146. this.log("ERROR", "STATIONS_FORCE_SKIP", `Force skipping station "${stationId}" failed. "${err}"`);
  1147. return cb({ status: "error", message: err });
  1148. }
  1149. StationsModule.runJob("SKIP_STATION", { stationId, natural: false });
  1150. this.log("SUCCESS", "STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  1151. return cb({
  1152. status: "success",
  1153. message: "Successfully skipped station."
  1154. });
  1155. }
  1156. );
  1157. }),
  1158. /**
  1159. * Leaves the user's current station
  1160. *
  1161. * @param {object} session - user session
  1162. * @param {string} stationId - id of station to leave
  1163. * @param {Function} cb - callback
  1164. */
  1165. leave(session, stationId, cb) {
  1166. async.waterfall(
  1167. [
  1168. next => {
  1169. StationsModule.runJob("GET_STATION", { stationId }, this)
  1170. .then(station => next(null, station))
  1171. .catch(next);
  1172. },
  1173. (station, next) => {
  1174. if (!station) return next("Station not found.");
  1175. return next();
  1176. }
  1177. ],
  1178. async (err, userCount) => {
  1179. if (err) {
  1180. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1181. this.log("ERROR", "STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  1182. return cb({ status: "error", message: err });
  1183. }
  1184. this.log("SUCCESS", "STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  1185. WSModule.runJob("SOCKET_LEAVE_ROOM", { socketId: session.socketId, room: `station.${stationId}` });
  1186. WSModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", { sockets: [session.socketId] });
  1187. delete StationsModule.userList[session.socketId];
  1188. return cb({
  1189. status: "success",
  1190. message: "Successfully left station.",
  1191. data: { userCount }
  1192. });
  1193. }
  1194. );
  1195. },
  1196. /**
  1197. * Updates a station's settings
  1198. *
  1199. * @param session
  1200. * @param stationId - the station id
  1201. * @param station - updated station object
  1202. * @param cb
  1203. */
  1204. update: isOwnerRequired(async function update(session, stationId, newStation, cb) {
  1205. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1206. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  1207. async.waterfall(
  1208. [
  1209. next => {
  1210. stationModel.findOne({ _id: stationId }, next);
  1211. },
  1212. (previousStation, next) => {
  1213. stationModel.updateOne({ _id: stationId }, newStation, { runValidators: true }, err => {
  1214. next(err, previousStation);
  1215. });
  1216. },
  1217. (previousStation, next) => {
  1218. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1219. .then(station => next(null, station, previousStation))
  1220. .catch(next);
  1221. },
  1222. (station, previousStation, next) => {
  1223. if (newStation.autofill.enabled && !previousStation.autofill.enabled)
  1224. StationsModule.runJob("AUTOFILL_STATION", { stationId }, this)
  1225. .then(() => {
  1226. CacheModule.runJob("PUB", {
  1227. channel: "station.queueUpdate",
  1228. value: stationId
  1229. })
  1230. .then(() => next(null, station, previousStation))
  1231. .catch(next);
  1232. })
  1233. .catch(next);
  1234. else next(null, station, previousStation);
  1235. },
  1236. (station, previousStation, next) => {
  1237. playlistModel.updateOne(
  1238. { _id: station.playlist },
  1239. { $set: { displayName: `Station - ${station.displayName}` } },
  1240. err => {
  1241. next(err, station, previousStation);
  1242. }
  1243. );
  1244. }
  1245. ],
  1246. async (err, station, previousStation) => {
  1247. if (err) {
  1248. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1249. this.log("ERROR", "STATIONS_UPDATE", `Updating station "${stationId}" failed. "${err}"`);
  1250. return cb({ status: "error", message: err });
  1251. }
  1252. this.log("SUCCESS", "STATIONS_UPDATE", `Updated station "${stationId}" successfully.`);
  1253. CacheModule.runJob("PUB", {
  1254. channel: "station.updated",
  1255. value: { stationId, previousStation }
  1256. });
  1257. return cb({
  1258. status: "success",
  1259. message: "Successfully updated the station."
  1260. });
  1261. }
  1262. );
  1263. }),
  1264. /**
  1265. * Pauses a station
  1266. *
  1267. * @param session
  1268. * @param stationId - the station id
  1269. * @param cb
  1270. */
  1271. pause: isOwnerRequired(async function pause(session, stationId, cb) {
  1272. const stationModel = await DBModule.runJob(
  1273. "GET_MODEL",
  1274. {
  1275. modelName: "station"
  1276. },
  1277. this
  1278. );
  1279. async.waterfall(
  1280. [
  1281. next => {
  1282. StationsModule.runJob("GET_STATION", { stationId }, this)
  1283. .then(station => {
  1284. next(null, station);
  1285. })
  1286. .catch(next);
  1287. },
  1288. (station, next) => {
  1289. if (!station) return next("Station not found.");
  1290. if (station.paused) return next("That station was already paused.");
  1291. return stationModel.updateOne(
  1292. { _id: stationId },
  1293. { $set: { paused: true, pausedAt: Date.now() } },
  1294. next
  1295. );
  1296. },
  1297. (res, next) => {
  1298. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1299. .then(station => {
  1300. next(null, station);
  1301. })
  1302. .catch(next);
  1303. }
  1304. ],
  1305. async err => {
  1306. if (err) {
  1307. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1308. this.log("ERROR", "STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  1309. return cb({ status: "error", message: err });
  1310. }
  1311. this.log("SUCCESS", "STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  1312. CacheModule.runJob("PUB", {
  1313. channel: "station.pause",
  1314. value: stationId
  1315. });
  1316. NotificationsModule.runJob("UNSCHEDULE", {
  1317. name: `stations.nextSong?id=${stationId}`
  1318. });
  1319. return cb({
  1320. status: "success",
  1321. message: "Successfully paused."
  1322. });
  1323. }
  1324. );
  1325. }),
  1326. /**
  1327. * Resumes a station
  1328. *
  1329. * @param session
  1330. * @param stationId - the station id
  1331. * @param cb
  1332. */
  1333. resume: isOwnerRequired(async function resume(session, stationId, cb) {
  1334. const stationModel = await DBModule.runJob(
  1335. "GET_MODEL",
  1336. {
  1337. modelName: "station"
  1338. },
  1339. this
  1340. );
  1341. async.waterfall(
  1342. [
  1343. next => {
  1344. StationsModule.runJob("GET_STATION", { stationId }, this)
  1345. .then(station => {
  1346. next(null, station);
  1347. })
  1348. .catch(next);
  1349. },
  1350. (station, next) => {
  1351. if (!station) return next("Station not found.");
  1352. if (!station.paused) return next("That station is not paused.");
  1353. station.timePaused += Date.now() - station.pausedAt;
  1354. return stationModel.updateOne(
  1355. { _id: stationId },
  1356. {
  1357. $set: { paused: false },
  1358. $inc: { timePaused: Date.now() - station.pausedAt }
  1359. },
  1360. next
  1361. );
  1362. },
  1363. (res, next) => {
  1364. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1365. .then(station => {
  1366. next(null, station);
  1367. })
  1368. .catch(next);
  1369. }
  1370. ],
  1371. async err => {
  1372. if (err) {
  1373. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1374. this.log("ERROR", "STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  1375. return cb({ status: "error", message: err });
  1376. }
  1377. this.log("SUCCESS", "STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  1378. CacheModule.runJob("PUB", {
  1379. channel: "station.resume",
  1380. value: stationId
  1381. });
  1382. return cb({
  1383. status: "success",
  1384. message: "Successfully resumed."
  1385. });
  1386. }
  1387. );
  1388. }),
  1389. /**
  1390. * Removes a station
  1391. *
  1392. * @param session
  1393. * @param stationId - the station id
  1394. * @param cb
  1395. */
  1396. remove: isOwnerRequired(async function remove(session, stationId, cb) {
  1397. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1398. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  1399. async.waterfall(
  1400. [
  1401. next => {
  1402. stationModel.findById(stationId, (err, station) => {
  1403. if (err) return next(err);
  1404. return next(null, station);
  1405. });
  1406. },
  1407. (station, next) => {
  1408. stationModel.deleteOne({ _id: stationId }, err => next(err, station));
  1409. },
  1410. (station, next) => {
  1411. CacheModule.runJob("HDEL", { table: "stations", key: stationId }, this)
  1412. .then(() => next(null, station))
  1413. .catch(next);
  1414. },
  1415. // remove the playlist for the station
  1416. (station, next) => {
  1417. if (station.playlist)
  1418. PlaylistsModule.runJob("DELETE_PLAYLIST", { playlistId: station.playlist })
  1419. .then(() => {})
  1420. .catch(next);
  1421. next(null, station);
  1422. },
  1423. // remove reference to the station id in any array of a user's favorite stations
  1424. (station, next) => {
  1425. userModel.updateMany(
  1426. { favoriteStations: stationId },
  1427. { $pull: { favoriteStations: stationId } },
  1428. err => next(err, station)
  1429. );
  1430. }
  1431. ],
  1432. async (err, station) => {
  1433. if (err) {
  1434. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1435. this.log("ERROR", "STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  1436. return cb({ status: "error", message: err });
  1437. }
  1438. this.log("SUCCESS", "STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  1439. CacheModule.runJob("PUB", {
  1440. channel: "station.remove",
  1441. value: stationId
  1442. });
  1443. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1444. userId: session.userId,
  1445. type: "station__remove",
  1446. payload: { message: `Removed a station named ${station.displayName}` }
  1447. });
  1448. ActivitiesModule.runJob("REMOVE_ACTIVITY_REFERENCES", { type: "stationId", stationId });
  1449. return cb({
  1450. status: "success",
  1451. message: "Successfully removed."
  1452. });
  1453. }
  1454. );
  1455. }),
  1456. /**
  1457. * Create a station
  1458. *
  1459. * @param session
  1460. * @param data - the station data
  1461. * @param cb
  1462. */
  1463. create: isLoginRequired(async function create(session, data, cb) {
  1464. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  1465. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1466. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  1467. data.name = data.name.toLowerCase();
  1468. let blacklist = [
  1469. "about",
  1470. "support",
  1471. "staff",
  1472. "help",
  1473. "news",
  1474. "terms",
  1475. "privacy",
  1476. "profile",
  1477. "c",
  1478. "community",
  1479. "tos",
  1480. "login",
  1481. "register",
  1482. "p",
  1483. "official",
  1484. "o",
  1485. "faq",
  1486. "team",
  1487. "donate",
  1488. "buy",
  1489. "shop",
  1490. "forums",
  1491. "explore",
  1492. "settings",
  1493. "admin",
  1494. "auth",
  1495. "reset_password",
  1496. "backend",
  1497. "api",
  1498. "songs",
  1499. "playlists",
  1500. "playlist",
  1501. "albums",
  1502. "artists",
  1503. "artist",
  1504. "station"
  1505. ];
  1506. if (data.type === "community" && config.has("blacklistedCommunityStationNames"))
  1507. blacklist = [...blacklist, ...config.get("blacklistedCommunityStationNames")];
  1508. async.waterfall(
  1509. [
  1510. next => {
  1511. if (!data) return next("Invalid data.");
  1512. return next();
  1513. },
  1514. next => {
  1515. stationModel.findOne(
  1516. {
  1517. $or: [{ name: data.name }, { displayName: new RegExp(`^${data.displayName}$`, "i") }]
  1518. },
  1519. next
  1520. );
  1521. },
  1522. (station, next) => {
  1523. this.log(station);
  1524. if (station) return next("A station with that name or display name already exists.");
  1525. if (blacklist.indexOf(data.name) !== -1)
  1526. return next("That name is blacklisted. Please use a different name.");
  1527. if (data.type === "official") {
  1528. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1529. if (err) return next(err);
  1530. if (!user) return next("User not found.");
  1531. if (user.role !== "admin") return next("Admin required.");
  1532. return next();
  1533. });
  1534. }
  1535. return next();
  1536. },
  1537. next => {
  1538. const stationId = mongoose.Types.ObjectId();
  1539. playlistModel.create(
  1540. {
  1541. displayName: `Station - ${data.name}`,
  1542. songs: [],
  1543. createdBy: data.type === "official" ? "Musare" : session.userId,
  1544. createdFor: `${stationId}`,
  1545. createdAt: Date.now(),
  1546. type: "station"
  1547. },
  1548. (err, playlist) => {
  1549. next(err, playlist, stationId);
  1550. }
  1551. );
  1552. },
  1553. (playlist, stationId, next) => {
  1554. const { name, displayName, description, type } = data;
  1555. if (type === "official") {
  1556. stationModel.create(
  1557. {
  1558. _id: stationId,
  1559. name,
  1560. displayName,
  1561. description,
  1562. playlist: playlist._id,
  1563. type,
  1564. privacy: "private",
  1565. queue: [],
  1566. currentSong: null
  1567. },
  1568. next
  1569. );
  1570. } else {
  1571. stationModel.create(
  1572. {
  1573. _id: stationId,
  1574. name,
  1575. displayName,
  1576. description,
  1577. playlist: playlist._id,
  1578. type,
  1579. privacy: "private",
  1580. owner: session.userId,
  1581. queue: [],
  1582. currentSong: null
  1583. },
  1584. next
  1585. );
  1586. }
  1587. }
  1588. ],
  1589. async (err, station) => {
  1590. if (err) {
  1591. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1592. this.log("ERROR", "STATIONS_CREATE", `Creating station failed. "${err}"`);
  1593. cb({ status: "error", message: err });
  1594. } else {
  1595. this.log("SUCCESS", "STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  1596. CacheModule.runJob("PUB", {
  1597. channel: "station.create",
  1598. value: station._id
  1599. });
  1600. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1601. userId: session.userId,
  1602. type: "station__create",
  1603. payload: {
  1604. message: `Created a station named <stationId>${station.displayName}</stationId>`,
  1605. stationId: station._id
  1606. }
  1607. });
  1608. cb({
  1609. status: "success",
  1610. message: "Successfully created station."
  1611. });
  1612. }
  1613. }
  1614. );
  1615. }),
  1616. /**
  1617. * Adds song to station queue
  1618. *
  1619. * @param session
  1620. * @param stationId - the station id
  1621. * @param youtubeId - the song id
  1622. * @param cb
  1623. */
  1624. addToQueue: isLoginRequired(async function addToQueue(session, stationId, youtubeId, cb) {
  1625. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  1626. const stationModel = await DBModule.runJob(
  1627. "GET_MODEL",
  1628. {
  1629. modelName: "station"
  1630. },
  1631. this
  1632. );
  1633. async.waterfall(
  1634. [
  1635. next => {
  1636. StationsModule.runJob("GET_STATION", { stationId }, this)
  1637. .then(station => {
  1638. next(null, station);
  1639. })
  1640. .catch(next);
  1641. },
  1642. (station, next) => {
  1643. if (!station) return next("Station not found.");
  1644. if (!station.requests.enabled) return next("Requests are disabled in this station.");
  1645. if (
  1646. station.requests.access === "owner" ||
  1647. (station.requests.access === "user" && station.privacy === "private")
  1648. ) {
  1649. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1650. if (err) return next(err);
  1651. if (user.role !== "admin" && station.owner !== session.userId)
  1652. return next("You do not have permission to add songs to queue.");
  1653. return next(null, station);
  1654. });
  1655. }
  1656. return next(null, station);
  1657. },
  1658. (station, next) =>
  1659. StationsModule.runJob(
  1660. "CAN_USER_VIEW_STATION",
  1661. {
  1662. station,
  1663. userId: session.userId
  1664. },
  1665. this
  1666. )
  1667. .then(canView => {
  1668. if (canView) return next(null, station);
  1669. return next("Insufficient permissions.");
  1670. })
  1671. .catch(err => next(err)),
  1672. (station, next) => {
  1673. if (station.currentSong && station.currentSong.youtubeId === youtubeId)
  1674. return next("That song is currently playing.");
  1675. return async.each(
  1676. station.queue,
  1677. (queueSong, next) => {
  1678. if (queueSong.youtubeId === youtubeId) return next("That song is already in the queue.");
  1679. return next();
  1680. },
  1681. err => next(err, station)
  1682. );
  1683. },
  1684. (station, next) => {
  1685. DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
  1686. .then(UserModel => {
  1687. UserModel.findOne(
  1688. { _id: session.userId },
  1689. { "preferences.anonymousSongRequests": 1 },
  1690. (err, user) => next(err, station, user)
  1691. );
  1692. })
  1693. .catch(next);
  1694. },
  1695. (station, user, next) => {
  1696. SongsModule.runJob(
  1697. "ENSURE_SONG_EXISTS_BY_YOUTUBE_ID",
  1698. {
  1699. youtubeId,
  1700. userId: user.preferences.anonymousSongRequests ? null : session.userId,
  1701. automaticallyRequested: true
  1702. },
  1703. this
  1704. )
  1705. .then(response => {
  1706. const { song } = response;
  1707. const { _id, title, skipDuration, artists, thumbnail, duration, verified } = song;
  1708. next(
  1709. null,
  1710. {
  1711. _id,
  1712. youtubeId,
  1713. title,
  1714. skipDuration,
  1715. artists,
  1716. thumbnail,
  1717. duration,
  1718. verified
  1719. },
  1720. station
  1721. );
  1722. })
  1723. .catch(next);
  1724. },
  1725. (song, station, next) => {
  1726. const blacklist = [];
  1727. async.eachLimit(
  1728. station.blacklist,
  1729. 1,
  1730. (playlistId, next) => {
  1731. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  1732. .then(playlist => {
  1733. blacklist.push(playlist);
  1734. next();
  1735. })
  1736. .catch(next);
  1737. },
  1738. err => {
  1739. next(err, song, station, blacklist);
  1740. }
  1741. );
  1742. },
  1743. (song, station, blacklist, next) => {
  1744. const blacklistedSongs = blacklist
  1745. .flatMap(blacklistedPlaylist => blacklistedPlaylist.songs)
  1746. .reduce(
  1747. (items, item) =>
  1748. items.find(x => x.youtubeId === item.youtubeId) ? [...items] : [...items, item],
  1749. []
  1750. );
  1751. if (
  1752. blacklistedSongs.find(blacklistedSong => blacklistedSong._id.toString() === song._id.toString())
  1753. )
  1754. next("That song is in an blacklisted playlist and cannot be played.");
  1755. else next(null, song, station);
  1756. },
  1757. (song, station, next) => {
  1758. song.requestedBy = session.userId;
  1759. song.requestedAt = Date.now();
  1760. return next(null, song, station);
  1761. },
  1762. (song, station, next) => {
  1763. if (station.queue.length === 0) return next(null, song);
  1764. let totalSongs = 0;
  1765. station.queue.forEach(song => {
  1766. if (session.userId === song.requestedBy) {
  1767. totalSongs += 1;
  1768. }
  1769. });
  1770. if (totalSongs >= station.requests.limit)
  1771. return next(`The max amount of songs per user is ${station.requests.limit}.`);
  1772. return next(null, song);
  1773. },
  1774. // (song, station, next) => {
  1775. // song.requestedBy = session.userId;
  1776. // song.requestedAt = Date.now();
  1777. // let totalDuration = 0;
  1778. // station.queue.forEach(song => {
  1779. // totalDuration += song.duration;
  1780. // });
  1781. // if (totalDuration >= 3600 * 3) return next("The max length of the queue is 3 hours.");
  1782. // return next(null, song, station);
  1783. // },
  1784. // (song, station, next) => {
  1785. // if (station.queue.length === 0) return next(null, song, station);
  1786. // let totalDuration = 0;
  1787. // const userId = station.queue[station.queue.length - 1].requestedBy;
  1788. // station.queue.forEach(song => {
  1789. // if (userId === song.requestedBy) {
  1790. // totalDuration += song.duration;
  1791. // }
  1792. // });
  1793. // if (totalDuration >= 900) return next("The max length of songs per user is 15 minutes.");
  1794. // return next(null, song, station);
  1795. // },
  1796. // (song, station, next) => {
  1797. // if (station.queue.length === 0) return next(null, song);
  1798. // let totalSongs = 0;
  1799. // const userId = station.queue[station.queue.length - 1].requestedBy;
  1800. // station.queue.forEach(song => {
  1801. // if (userId === song.requestedBy) {
  1802. // totalSongs += 1;
  1803. // }
  1804. // });
  1805. // if (totalSongs <= 2) return next(null, song);
  1806. // if (totalSongs > 3)
  1807. // return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1808. // if (
  1809. // station.queue[station.queue.length - 2].requestedBy !== userId ||
  1810. // station.queue[station.queue.length - 3] !== userId
  1811. // )
  1812. // return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1813. // return next(null, song);
  1814. // },
  1815. (song, next) => {
  1816. stationModel.updateOne(
  1817. { _id: stationId },
  1818. { $push: { queue: song } },
  1819. { runValidators: true },
  1820. next
  1821. );
  1822. },
  1823. (res, next) => {
  1824. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1825. .then(station => next(null, station))
  1826. .catch(next);
  1827. }
  1828. ],
  1829. async err => {
  1830. if (err) {
  1831. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1832. this.log(
  1833. "ERROR",
  1834. "STATIONS_ADD_SONG_TO_QUEUE",
  1835. `Adding song "${youtubeId}" to station "${stationId}" queue failed. "${err}"`
  1836. );
  1837. return cb({ status: "error", message: err });
  1838. }
  1839. this.log(
  1840. "SUCCESS",
  1841. "STATIONS_ADD_SONG_TO_QUEUE",
  1842. `Added song "${youtubeId}" to station "${stationId}" successfully.`
  1843. );
  1844. CacheModule.runJob("PUB", {
  1845. channel: "station.queueUpdate",
  1846. value: stationId
  1847. });
  1848. return cb({
  1849. status: "success",
  1850. message: "Successfully added song to queue."
  1851. });
  1852. }
  1853. );
  1854. }),
  1855. /**
  1856. * Removes song from station queue
  1857. *
  1858. * @param session
  1859. * @param stationId - the station id
  1860. * @param youtubeId - the youtube id
  1861. * @param cb
  1862. */
  1863. removeFromQueue: isOwnerRequired(async function removeFromQueue(session, stationId, youtubeId, cb) {
  1864. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1865. async.waterfall(
  1866. [
  1867. next => {
  1868. if (!youtubeId) return next("Invalid youtube id.");
  1869. return StationsModule.runJob("GET_STATION", { stationId }, this)
  1870. .then(station => next(null, station))
  1871. .catch(next);
  1872. },
  1873. (station, next) => {
  1874. if (!station) return next("Station not found.");
  1875. return async.each(
  1876. station.queue,
  1877. (queueSong, next) => {
  1878. if (queueSong.youtubeId === youtubeId) return next(true);
  1879. return next();
  1880. },
  1881. err => {
  1882. if (err === true) return next();
  1883. return next("Song is not currently in the queue.");
  1884. }
  1885. );
  1886. },
  1887. next => {
  1888. stationModel.updateOne({ _id: stationId }, { $pull: { queue: { youtubeId } } }, next);
  1889. },
  1890. (res, next) => {
  1891. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1892. .then(station => next(null, station))
  1893. .catch(next);
  1894. }
  1895. ],
  1896. async err => {
  1897. if (err) {
  1898. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1899. this.log(
  1900. "ERROR",
  1901. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1902. `Removing song "${youtubeId}" from station "${stationId}" queue failed. "${err}"`
  1903. );
  1904. return cb({ status: "error", message: err });
  1905. }
  1906. this.log(
  1907. "SUCCESS",
  1908. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1909. `Removed song "${youtubeId}" from station "${stationId}" successfully.`
  1910. );
  1911. CacheModule.runJob("PUB", {
  1912. channel: "station.queueUpdate",
  1913. value: stationId
  1914. });
  1915. return cb({
  1916. status: "success",
  1917. message: "Successfully removed song from queue."
  1918. });
  1919. }
  1920. );
  1921. }),
  1922. /**
  1923. * Gets the queue from a station
  1924. *
  1925. * @param {object} session - user session
  1926. * @param {string} stationId - the station id
  1927. * @param {Function} cb - callback
  1928. */
  1929. getQueue(session, stationId, cb) {
  1930. async.waterfall(
  1931. [
  1932. next => {
  1933. StationsModule.runJob("GET_STATION", { stationId }, this)
  1934. .then(station => next(null, station))
  1935. .catch(next);
  1936. },
  1937. (station, next) => {
  1938. if (!station) return next("Station not found.");
  1939. return next(null, station);
  1940. },
  1941. (station, next) => {
  1942. StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  1943. .then(canView => {
  1944. if (canView) return next(null, station);
  1945. return next("Insufficient permissions.");
  1946. })
  1947. .catch(err => next(err));
  1948. },
  1949. (station, next) => next(null, station.queue)
  1950. ],
  1951. async (err, queue) => {
  1952. if (err) {
  1953. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1954. this.log(
  1955. "ERROR",
  1956. "STATIONS_GET_QUEUE",
  1957. `Getting queue for station "${stationId}" failed. "${err}"`
  1958. );
  1959. return cb({ status: "error", message: err });
  1960. }
  1961. this.log("SUCCESS", "STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  1962. return cb({
  1963. status: "success",
  1964. message: "Successfully got queue.",
  1965. data: { queue }
  1966. });
  1967. }
  1968. );
  1969. },
  1970. /**
  1971. * Reposition a song in station queue
  1972. *
  1973. * @param {object} session - user session
  1974. * @param {object} song - contains details about the song that is to be repositioned
  1975. * @param {string} song.youtubeId - the youtube id of the song
  1976. * @param {number} song.newIndex - the new position for the song in the queue
  1977. * @param {number} song.oldIndex - the old position of the song in the queue
  1978. * @param {string} stationId - the station id
  1979. * @param {Function} cb - callback
  1980. */
  1981. repositionSongInQueue: isOwnerRequired(async function repositionQueue(session, stationId, song, cb) {
  1982. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1983. async.waterfall(
  1984. [
  1985. next => {
  1986. if (!song || !song.youtubeId) return next("You must provide a song to reposition.");
  1987. return next();
  1988. },
  1989. // remove song from queue
  1990. next => {
  1991. stationModel.updateOne(
  1992. { _id: stationId },
  1993. { $pull: { queue: { youtubeId: song.youtubeId } } },
  1994. next
  1995. );
  1996. },
  1997. // add song back to queue (in new position)
  1998. (res, next) => {
  1999. stationModel.updateOne(
  2000. { _id: stationId },
  2001. { $push: { queue: { $each: [song], $position: song.newIndex } } },
  2002. err => next(err)
  2003. );
  2004. },
  2005. // update the cache representation of the station
  2006. next => {
  2007. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2008. .then(station => next(null, station))
  2009. .catch(next);
  2010. }
  2011. ],
  2012. async err => {
  2013. if (err) {
  2014. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2015. this.log(
  2016. "ERROR",
  2017. "STATIONS_REPOSITION_SONG_IN_QUEUE",
  2018. `Repositioning song ${song.youtubeId} in queue of station "${stationId}" failed. "${err}"`
  2019. );
  2020. return cb({ status: "error", message: err });
  2021. }
  2022. this.log(
  2023. "SUCCESS",
  2024. "STATIONS_REPOSITION_SONG_IN_QUEUE",
  2025. `Repositioned song ${song.youtubeId} in queue of station "${stationId}" successfully.`
  2026. );
  2027. CacheModule.runJob("PUB", {
  2028. channel: "station.repositionSongInQueue",
  2029. value: {
  2030. song: {
  2031. youtubeId: song.youtubeId,
  2032. oldIndex: song.oldIndex,
  2033. newIndex: song.newIndex
  2034. },
  2035. stationId
  2036. }
  2037. });
  2038. return cb({
  2039. status: "success",
  2040. message: "Successfully repositioned song in queue."
  2041. });
  2042. }
  2043. );
  2044. }),
  2045. /**
  2046. * Autofill a playlist in a station
  2047. *
  2048. * @param session
  2049. * @param stationId - the station id
  2050. * @param playlistId - the playlist id
  2051. * @param cb
  2052. */
  2053. autofillPlaylist: isOwnerRequired(async function autofillPlaylist(session, stationId, playlistId, cb) {
  2054. async.waterfall(
  2055. [
  2056. next => {
  2057. StationsModule.runJob("GET_STATION", { stationId }, this)
  2058. .then(station => next(null, station))
  2059. .catch(next);
  2060. },
  2061. (station, next) => {
  2062. if (!station) return next("Station not found.");
  2063. if (station.autofill.playlists.indexOf(playlistId) !== -1)
  2064. return next("That playlist is already autofilling.");
  2065. if (station.autofill.mode === "sequential" && station.autofill.playlists.length > 0)
  2066. return next("Error: Only 1 playlist can be autofilling in sequential mode.");
  2067. return next();
  2068. },
  2069. next => {
  2070. StationsModule.runJob("AUTOFILL_PLAYLIST", { stationId, playlistId }, this)
  2071. .then(() => {
  2072. next();
  2073. })
  2074. .catch(next);
  2075. }
  2076. ],
  2077. async err => {
  2078. if (err) {
  2079. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2080. this.log(
  2081. "ERROR",
  2082. "STATIONS_AUTOFILL_PLAYLIST",
  2083. `Including playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2084. );
  2085. return cb({ status: "error", message: err });
  2086. }
  2087. this.log(
  2088. "SUCCESS",
  2089. "STATIONS_AUTOFILL_PLAYLIST",
  2090. `Including playlist "${playlistId}" for station "${stationId}" successfully.`
  2091. );
  2092. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2093. CacheModule.runJob("PUB", {
  2094. channel: "station.autofillPlaylist",
  2095. value: {
  2096. playlistId,
  2097. stationId
  2098. }
  2099. });
  2100. return cb({
  2101. status: "success",
  2102. message: "Successfully added autofill playlist."
  2103. });
  2104. }
  2105. );
  2106. }),
  2107. /**
  2108. * Remove autofilled playlist from a station
  2109. *
  2110. * @param session
  2111. * @param stationId - the station id
  2112. * @param playlistId - the playlist id
  2113. * @param cb
  2114. */
  2115. removeAutofillPlaylist: isOwnerRequired(async function removeAutofillPlaylist(session, stationId, playlistId, cb) {
  2116. async.waterfall(
  2117. [
  2118. next => {
  2119. StationsModule.runJob("GET_STATION", { stationId }, this)
  2120. .then(station => next(null, station))
  2121. .catch(next);
  2122. },
  2123. (station, next) => {
  2124. if (!station) return next("Station not found.");
  2125. if (station.autofill.playlists.indexOf(playlistId) === -1)
  2126. return next("That playlist is not autofilling.");
  2127. return next();
  2128. },
  2129. next => {
  2130. StationsModule.runJob("REMOVE_AUTOFILL_PLAYLIST", { stationId, playlistId }, this)
  2131. .then(() => {
  2132. next();
  2133. })
  2134. .catch(next);
  2135. }
  2136. ],
  2137. async err => {
  2138. if (err) {
  2139. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2140. this.log(
  2141. "ERROR",
  2142. "STATIONS_REMOVE_AUTOFILL_PLAYLIST",
  2143. `Removing autofill playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2144. );
  2145. return cb({ status: "error", message: err });
  2146. }
  2147. this.log(
  2148. "SUCCESS",
  2149. "STATIONS_REMOVE_AUTOFILL_PLAYLIST",
  2150. `Removing autofill playlist "${playlistId}" for station "${stationId}" successfully.`
  2151. );
  2152. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2153. CacheModule.runJob("PUB", {
  2154. channel: "station.removedAutofillPlaylist",
  2155. value: {
  2156. playlistId,
  2157. stationId
  2158. }
  2159. });
  2160. return cb({
  2161. status: "success",
  2162. message: "Successfully removed autofill playlist."
  2163. });
  2164. }
  2165. );
  2166. }),
  2167. /**
  2168. * Blacklist a playlist in a station
  2169. *
  2170. * @param session
  2171. * @param stationId - the station id
  2172. * @param playlistId - the playlist id
  2173. * @param cb
  2174. */
  2175. blacklistPlaylist: isOwnerRequired(async function blacklistPlaylist(session, stationId, playlistId, cb) {
  2176. async.waterfall(
  2177. [
  2178. next => {
  2179. StationsModule.runJob("GET_STATION", { stationId }, this)
  2180. .then(station => next(null, station))
  2181. .catch(next);
  2182. },
  2183. (station, next) => {
  2184. if (!station) return next("Station not found.");
  2185. if (station.blacklist.indexOf(playlistId) !== -1)
  2186. return next("That playlist is already blacklisted.");
  2187. return next();
  2188. },
  2189. next => {
  2190. StationsModule.runJob("BLACKLIST_PLAYLIST", { stationId, playlistId }, this)
  2191. .then(() => {
  2192. next();
  2193. })
  2194. .catch(next);
  2195. }
  2196. ],
  2197. async err => {
  2198. if (err) {
  2199. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2200. this.log(
  2201. "ERROR",
  2202. "STATIONS_BLACKLIST_PLAYLIST",
  2203. `Blacklisting playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2204. );
  2205. return cb({ status: "error", message: err });
  2206. }
  2207. this.log(
  2208. "SUCCESS",
  2209. "STATIONS_BLACKLIST_PLAYLIST",
  2210. `Blacklisting playlist "${playlistId}" for station "${stationId}" successfully.`
  2211. );
  2212. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2213. CacheModule.runJob("PUB", {
  2214. channel: "station.blacklistedPlaylist",
  2215. value: {
  2216. playlistId,
  2217. stationId
  2218. }
  2219. });
  2220. return cb({
  2221. status: "success",
  2222. message: "Successfully blacklisted playlist."
  2223. });
  2224. }
  2225. );
  2226. }),
  2227. /**
  2228. * Remove blacklisted a playlist from a station
  2229. *
  2230. * @param session
  2231. * @param stationId - the station id
  2232. * @param playlistId - the playlist id
  2233. * @param cb
  2234. */
  2235. removeBlacklistedPlaylist: isOwnerRequired(async function removeBlacklistedPlaylist(
  2236. session,
  2237. stationId,
  2238. playlistId,
  2239. cb
  2240. ) {
  2241. async.waterfall(
  2242. [
  2243. next => {
  2244. StationsModule.runJob("GET_STATION", { stationId }, this)
  2245. .then(station => next(null, station))
  2246. .catch(next);
  2247. },
  2248. (station, next) => {
  2249. if (!station) return next("Station not found.");
  2250. if (station.blacklist.indexOf(playlistId) === -1) return next("That playlist is not blacklisted.");
  2251. return next();
  2252. },
  2253. next => {
  2254. StationsModule.runJob("REMOVE_BLACKLISTED_PLAYLIST", { stationId, playlistId }, this)
  2255. .then(() => {
  2256. next();
  2257. })
  2258. .catch(next);
  2259. }
  2260. ],
  2261. async err => {
  2262. if (err) {
  2263. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2264. this.log(
  2265. "ERROR",
  2266. "STATIONS_REMOVE_BLACKLISTED_PLAYLIST",
  2267. `Removing blacklisted playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  2268. );
  2269. return cb({ status: "error", message: err });
  2270. }
  2271. this.log(
  2272. "SUCCESS",
  2273. "STATIONS_REMOVE_BLACKLISTED_PLAYLIST",
  2274. `Removing blacklisted playlist "${playlistId}" for station "${stationId}" successfully.`
  2275. );
  2276. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  2277. CacheModule.runJob("PUB", {
  2278. channel: "station.removedBlacklistedPlaylist",
  2279. value: {
  2280. playlistId,
  2281. stationId
  2282. }
  2283. });
  2284. return cb({
  2285. status: "success",
  2286. message: "Successfully removed blacklisted playlist."
  2287. });
  2288. }
  2289. );
  2290. }),
  2291. favoriteStation: isLoginRequired(async function favoriteStation(session, stationId, cb) {
  2292. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2293. async.waterfall(
  2294. [
  2295. next => {
  2296. StationsModule.runJob("GET_STATION", { stationId }, this)
  2297. .then(station => next(null, station))
  2298. .catch(next);
  2299. },
  2300. (station, next) => {
  2301. if (!station) return next("Station not found.");
  2302. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  2303. .then(canView => {
  2304. if (canView) return next(null, station);
  2305. return next("Insufficient permissions.");
  2306. })
  2307. .catch(err => next(err));
  2308. },
  2309. (station, next) => {
  2310. userModel.updateOne(
  2311. { _id: session.userId },
  2312. { $addToSet: { favoriteStations: stationId } },
  2313. (err, res) => next(err, station, res)
  2314. );
  2315. },
  2316. (station, res, next) => {
  2317. if (res.nModified === 0) return next("The station was already favorited.");
  2318. return next(null, station);
  2319. }
  2320. ],
  2321. async (err, station) => {
  2322. if (err) {
  2323. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2324. this.log("ERROR", "FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  2325. return cb({ status: "error", message: err });
  2326. }
  2327. this.log("SUCCESS", "FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  2328. CacheModule.runJob("PUB", {
  2329. channel: "user.favoritedStation",
  2330. value: {
  2331. userId: session.userId,
  2332. stationId
  2333. }
  2334. });
  2335. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2336. userId: session.userId,
  2337. type: "station__favorite",
  2338. payload: {
  2339. message: `Favorited station <stationId>${station.displayName}</stationId>`,
  2340. stationId
  2341. }
  2342. });
  2343. return cb({
  2344. status: "success",
  2345. message: "Succesfully favorited station."
  2346. });
  2347. }
  2348. );
  2349. }),
  2350. unfavoriteStation: isLoginRequired(async function unfavoriteStation(session, stationId, cb) {
  2351. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2352. async.waterfall(
  2353. [
  2354. next => {
  2355. userModel.updateOne({ _id: session.userId }, { $pull: { favoriteStations: stationId } }, next);
  2356. },
  2357. (res, next) => {
  2358. if (res.nModified === 0) return next("The station wasn't favorited.");
  2359. return next();
  2360. },
  2361. next => {
  2362. StationsModule.runJob("GET_STATION", { stationId }, this)
  2363. .then(station => next(null, station))
  2364. .catch(next);
  2365. }
  2366. ],
  2367. async (err, station) => {
  2368. if (err) {
  2369. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2370. this.log("ERROR", "UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  2371. return cb({ status: "error", message: err });
  2372. }
  2373. this.log("SUCCESS", "UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  2374. CacheModule.runJob("PUB", {
  2375. channel: "user.unfavoritedStation",
  2376. value: {
  2377. userId: session.userId,
  2378. stationId
  2379. }
  2380. });
  2381. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2382. userId: session.userId,
  2383. type: "station__unfavorite",
  2384. payload: {
  2385. message: `Unfavorited station <stationId>${station.displayName}</stationId>`,
  2386. stationId
  2387. }
  2388. });
  2389. return cb({
  2390. status: "success",
  2391. message: "Succesfully unfavorited station."
  2392. });
  2393. }
  2394. );
  2395. }),
  2396. /**
  2397. * Clears every station queue
  2398. *
  2399. * @param {object} session - the session object automatically added by socket.io
  2400. * @param {Function} cb - gets called with the result
  2401. */
  2402. clearEveryStationQueue: isAdminRequired(async function clearEveryStationQueue(session, cb) {
  2403. async.waterfall(
  2404. [
  2405. next => {
  2406. StationsModule.runJob("CLEAR_EVERY_STATION_QUEUE", {}, this)
  2407. .then(() => next())
  2408. .catch(next);
  2409. }
  2410. ],
  2411. async err => {
  2412. if (err) {
  2413. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2414. this.log("ERROR", "CLEAR_EVERY_STATION_QUEUE", `Clearing every station queue failed. "${err}"`);
  2415. return cb({ status: "error", message: err });
  2416. }
  2417. this.log("SUCCESS", "CLEAR_EVERY_STATION_QUEUE", "Clearing every station queue was successful.");
  2418. return cb({ status: "success", message: "Successfully cleared every station queue." });
  2419. }
  2420. );
  2421. }),
  2422. /**
  2423. * Reset a station queue
  2424. *
  2425. * @param {object} session - the session object automatically added by socket.io
  2426. * @param {string} stationId - the station id
  2427. * @param {Function} cb - gets called with the result
  2428. */
  2429. resetQueue: isAdminRequired(async function resetQueue(session, stationId, cb) {
  2430. async.waterfall(
  2431. [
  2432. next => {
  2433. StationsModule.runJob("RESET_QUEUE", { stationId }, this)
  2434. .then(() => next())
  2435. .catch(next);
  2436. }
  2437. ],
  2438. async err => {
  2439. if (err) {
  2440. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2441. this.log("ERROR", "RESET_QUEUE", `Resetting station queue failed. "${err}"`);
  2442. return cb({ status: "error", message: err });
  2443. }
  2444. this.log("SUCCESS", "RESET_QUEUE", "Resetting station queue was successful.");
  2445. return cb({ status: "success", message: "Successfully reset station queue." });
  2446. }
  2447. );
  2448. }),
  2449. /**
  2450. * Gets skip votes for a station
  2451. *
  2452. * @param session
  2453. * @param stationId - the station id
  2454. * @param stationId - the song id to get skipvotes for
  2455. * @param cb
  2456. */
  2457. getSkipVotes: isLoginRequired(async function getSkipVotes(session, stationId, songId, cb) {
  2458. async.waterfall(
  2459. [
  2460. next => {
  2461. StationsModule.runJob("GET_STATION", { stationId }, this)
  2462. .then(res => next(null, res.currentSong))
  2463. .catch(console.log);
  2464. },
  2465. (currentSong, next) => {
  2466. if (currentSong && currentSong._id === songId)
  2467. next(null, {
  2468. skipVotes: currentSong.skipVotes.length,
  2469. skipVotesCurrent: true
  2470. });
  2471. else
  2472. next(null, {
  2473. skipVotes: 0,
  2474. skipVotesCurrent: false
  2475. });
  2476. }
  2477. ],
  2478. async (err, data) => {
  2479. if (err) {
  2480. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2481. this.log(
  2482. "ERROR",
  2483. "STATIONS_GET_SKIP_VOTES",
  2484. `User "${session.userId}" failed to get skip votes for ${stationId}. "${err}"`
  2485. );
  2486. return cb({ status: "error", message: err });
  2487. }
  2488. const { skipVotes, skipVotesCurrent } = data;
  2489. return cb({
  2490. status: "success",
  2491. data: {
  2492. skipVotes,
  2493. skipVotesCurrent
  2494. }
  2495. });
  2496. }
  2497. );
  2498. })
  2499. };