stations.js 102 KB


  1. import async from "async";
  2. import mongoose from "mongoose";
  3. import config from "config";
  4. import { isLoginRequired, isOwnerRequired, isAdminRequired } from "./hooks";
  5. import moduleManager from "../../index";
  6. const DBModule = moduleManager.modules.db;
  7. const UtilsModule = moduleManager.modules.utils;
  8. const WSModule = moduleManager.modules.ws;
  9. const SongsModule = moduleManager.modules.songs;
  10. const PlaylistsModule = moduleManager.modules.playlists;
  11. const CacheModule = moduleManager.modules.cache;
  12. const NotificationsModule = moduleManager.modules.notifications;
  13. const StationsModule = moduleManager.modules.stations;
  14. const ActivitiesModule = moduleManager.modules.activities;
  15. CacheModule.runJob("SUB", {
  16. channel: "station.updateUsers",
  17. cb: ({ stationId, usersPerStation }) => {
  18. WSModule.runJob("EMIT_TO_ROOM", {
  19. room: `station.${stationId}`,
  20. args: ["event:station.users.updated", { data: { users: usersPerStation } }]
  21. });
  22. }
  23. });
  24. CacheModule.runJob("SUB", {
  25. channel: "station.updateUserCount",
  26. cb: ({ stationId, usersPerStationCount }) => {
  27. const count = usersPerStationCount || 0;
  28. WSModule.runJob("EMIT_TO_ROOM", {
  29. room: `station.${stationId}`,
  30. args: ["event:station.userCount.updated", { data: { userCount: count } }]
  31. });
  32. StationsModule.runJob("GET_STATION", { stationId }).then(async station => {
  33. if (station.privacy === "public")
  34. WSModule.runJob("EMIT_TO_ROOM", {
  35. room: "home",
  36. args: ["event:station.userCount.updated", { data: { stationId, userCount: count } }]
  37. });
  38. else {
  39. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  40. room: "home"
  41. });
  42. sockets.forEach(async socketId => {
  43. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  44. if (!socket) return;
  45. const { session } = socket;
  46. if (session.sessionId) {
  47. CacheModule.runJob("HGET", {
  48. table: "sessions",
  49. key: session.sessionId
  50. }).then(session => {
  51. if (session)
  52. DBModule.runJob(
  53. "GET_MODEL",
  54. {
  55. modelName: "user"
  56. },
  57. this
  58. ).then(userModel =>
  59. userModel.findOne({ _id: session.userId }, (err, user) => {
  60. if (user && user.role === "admin")
  61. socket.dispatch("event:station.userCount.updated", {
  62. data: { stationId, count }
  63. });
  64. else if (
  65. user &&
  66. station.type === "community" &&
  67. station.owner === session.userId
  68. )
  69. socket.dispatch("event:station.userCount.updated", {
  70. data: { stationId, count }
  71. });
  72. })
  73. );
  74. });
  75. }
  76. });
  77. }
  78. });
  79. }
  80. });
  81. CacheModule.runJob("SUB", {
  82. channel: "station.queueLockToggled",
  83. cb: data => {
  84. const { stationId, locked } = data;
  85. WSModule.runJob("EMIT_TO_ROOM", {
  86. room: `station.${stationId}`,
  87. args: ["event:station.queue.lock.toggled", { data: { locked } }]
  88. });
  89. WSModule.runJob("EMIT_TO_ROOM", {
  90. room: `manage-station.${stationId}`,
  91. args: ["event:station.queue.lock.toggled", { data: { stationId, locked } }]
  92. });
  93. }
  94. });
  95. CacheModule.runJob("SUB", {
  96. channel: "station.updatePartyMode",
  97. cb: data => {
  98. const { stationId, partyMode } = data;
  99. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  100. WSModule.runJob("EMIT_TO_ROOM", {
  101. room: `station.${stationId}`,
  102. args: ["event:station.partyMode.updated", { data: { partyMode } }]
  103. });
  104. WSModule.runJob("EMIT_TO_ROOM", {
  105. room: `manage-station.${stationId}`,
  106. args: ["event:station.partyMode.updated", { data: { stationId, partyMode } }]
  107. });
  108. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  109. room: `home`,
  110. station
  111. }).then(response => {
  112. const { socketsThatCan } = response;
  113. socketsThatCan.forEach(socket => {
  114. socket.dispatch("event:station.partyMode.updated", { data: { stationId, partyMode } });
  115. });
  116. });
  117. });
  118. }
  119. });
  120. CacheModule.runJob("SUB", {
  121. channel: "station.newPlayMode",
  122. cb: data => {
  123. const { stationId, playMode } = data;
  124. WSModule.runJob("EMIT_TO_ROOM", {
  125. room: `station.${stationId}`,
  126. args: ["event:station.playMode.updated", { data: { playMode } }]
  127. });
  128. WSModule.runJob("EMIT_TO_ROOM", {
  129. room: `manage-station.${stationId}`,
  130. args: ["event:station.playMode.updated", { data: { stationId, playMode } }]
  131. });
  132. }
  133. });
  134. CacheModule.runJob("SUB", {
  135. channel: "station.includedPlaylist",
  136. cb: data => {
  137. const { stationId, playlistId } = data;
  138. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }).then(playlist =>
  139. WSModule.runJob("EMIT_TO_ROOMS", {
  140. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  141. args: ["event:station.includedPlaylist", { data: { stationId, playlist } }]
  142. })
  143. );
  144. }
  145. });
  146. CacheModule.runJob("SUB", {
  147. channel: "station.excludedPlaylist",
  148. cb: data => {
  149. const { stationId, playlistId } = data;
  150. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }).then(playlist =>
  151. WSModule.runJob("EMIT_TO_ROOMS", {
  152. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  153. args: ["event:station.excludedPlaylist", { data: { stationId, playlist } }]
  154. })
  155. );
  156. }
  157. });
  158. CacheModule.runJob("SUB", {
  159. channel: "station.removedIncludedPlaylist",
  160. cb: data => {
  161. const { stationId, playlistId } = data;
  162. WSModule.runJob("EMIT_TO_ROOMS", {
  163. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  164. args: ["event:station.removedIncludedPlaylist", { data: { stationId, playlistId } }]
  165. });
  166. }
  167. });
  168. CacheModule.runJob("SUB", {
  169. channel: "station.removedExcludedPlaylist",
  170. cb: data => {
  171. const { stationId, playlistId } = data;
  172. WSModule.runJob("EMIT_TO_ROOMS", {
  173. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  174. args: ["event:station.removedExcludedPlaylist", { data: { stationId, playlistId } }]
  175. });
  176. }
  177. });
  178. CacheModule.runJob("SUB", {
  179. channel: "station.pause",
  180. cb: stationId => {
  181. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  182. WSModule.runJob("EMIT_TO_ROOM", {
  183. room: `station.${stationId}`,
  184. args: ["event:station.pause", { data: { pausedAt: station.pausedAt } }]
  185. });
  186. WSModule.runJob("EMIT_TO_ROOM", {
  187. room: `manage-station.${stationId}`,
  188. args: ["event:station.pause", { data: { stationId, pausedAt: station.pausedAt } }]
  189. });
  190. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  191. room: `home`,
  192. station
  193. }).then(response => {
  194. const { socketsThatCan } = response;
  195. socketsThatCan.forEach(socket => {
  196. socket.dispatch("event:station.pause", { data: { stationId } });
  197. });
  198. });
  199. });
  200. }
  201. });
  202. CacheModule.runJob("SUB", {
  203. channel: "station.resume",
  204. cb: stationId => {
  205. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  206. WSModule.runJob("EMIT_TO_ROOM", {
  207. room: `station.${stationId}`,
  208. args: ["event:station.resume", { data: { timePaused: station.timePaused } }]
  209. });
  210. WSModule.runJob("EMIT_TO_ROOM", {
  211. room: `manage-station.${stationId}`,
  212. args: ["event:station.resume", { data: { stationId, timePaused: station.timePaused } }]
  213. });
  214. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  215. room: `home`,
  216. station
  217. })
  218. .then(response => {
  219. const { socketsThatCan } = response;
  220. socketsThatCan.forEach(socket => {
  221. socket.dispatch("event:station.resume", { data: { stationId } });
  222. });
  223. })
  224. .catch(console.log);
  225. });
  226. }
  227. });
  228. CacheModule.runJob("SUB", {
  229. channel: "station.privacyUpdate",
  230. cb: response => {
  231. const { stationId, previousPrivacy } = response;
  232. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  233. if (previousPrivacy !== station.privacy) {
  234. if (station.privacy === "public") {
  235. // Station became public
  236. WSModule.runJob("EMIT_TO_ROOM", {
  237. room: "home",
  238. args: ["event:station.created", { data: { station } }]
  239. });
  240. } else if (previousPrivacy === "public") {
  241. // Station became hidden
  242. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  243. room: `home`,
  244. station
  245. }).then(response => {
  246. const { socketsThatCan, socketsThatCannot } = response;
  247. socketsThatCan.forEach(socket => {
  248. socket.dispatch("event:station.privacy.updated", {
  249. data: { stationId, privacy: station.privacy }
  250. });
  251. });
  252. socketsThatCannot.forEach(socket => {
  253. socket.dispatch("event:station.deleted", { data: { stationId } });
  254. });
  255. });
  256. } else {
  257. // Station was hidden and is still hidden
  258. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  259. room: `home`,
  260. station
  261. }).then(response => {
  262. const { socketsThatCan } = response;
  263. socketsThatCan.forEach(socket => {
  264. socket.dispatch("event:station.privacy.updated", {
  265. data: { stationId, privacy: station.privacy }
  266. });
  267. });
  268. });
  269. }
  270. }
  271. WSModule.runJob("EMIT_TO_ROOM", {
  272. room: `station.${stationId}`,
  273. args: ["event:station.privacy.updated", { data: { privacy: station.privacy } }]
  274. });
  275. WSModule.runJob("EMIT_TO_ROOM", {
  276. room: `manage-station.${stationId}`,
  277. args: ["event:station.privacy.updated", { data: { stationId, privacy: station.privacy } }]
  278. });
  279. });
  280. }
  281. });
  282. CacheModule.runJob("SUB", {
  283. channel: "station.nameUpdate",
  284. cb: res => {
  285. const { stationId, name } = res;
  286. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  287. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  288. room: `home`,
  289. station
  290. }).then(response => {
  291. const { socketsThatCan } = response;
  292. socketsThatCan.forEach(socket =>
  293. socket.dispatch("event:station.name.updated", { data: { stationId, name } })
  294. );
  295. });
  296. });
  297. WSModule.runJob("EMIT_TO_ROOMS", {
  298. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  299. args: ["event:station.name.updated", { data: { stationId, name } }]
  300. });
  301. }
  302. });
  303. CacheModule.runJob("SUB", {
  304. channel: "station.displayNameUpdate",
  305. cb: response => {
  306. const { stationId, displayName } = response;
  307. StationsModule.runJob("GET_STATION", { stationId }).then(station =>
  308. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  309. room: `home`,
  310. station
  311. }).then(response => {
  312. const { socketsThatCan } = response;
  313. socketsThatCan.forEach(socket =>
  314. socket.dispatch("event:station.displayName.updated", { data: { stationId, displayName } })
  315. );
  316. })
  317. );
  318. WSModule.runJob("EMIT_TO_ROOMS", {
  319. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  320. args: ["event:station.displayName.updated", { data: { stationId, displayName } }]
  321. });
  322. }
  323. });
  324. CacheModule.runJob("SUB", {
  325. channel: "station.descriptionUpdate",
  326. cb: response => {
  327. const { stationId, description } = response;
  328. StationsModule.runJob("GET_STATION", { stationId }).then(station =>
  329. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  330. room: `home`,
  331. station
  332. }).then(response => {
  333. const { socketsThatCan } = response;
  334. socketsThatCan.forEach(socket =>
  335. socket.dispatch("event:station.description.updated", { data: { stationId, description } })
  336. );
  337. })
  338. );
  339. WSModule.runJob("EMIT_TO_ROOMS", {
  340. rooms: [`station.${stationId}`, `manage-station.${stationId}`],
  341. args: ["event:station.description.updated", { data: { stationId, description } }]
  342. });
  343. }
  344. });
  345. CacheModule.runJob("SUB", {
  346. channel: "station.themeUpdate",
  347. cb: res => {
  348. const { stationId } = res;
  349. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  350. WSModule.runJob("EMIT_TO_ROOM", {
  351. room: `station.${stationId}`,
  352. args: ["event:station.theme.updated", { data: { theme: station.theme } }]
  353. });
  354. WSModule.runJob("EMIT_TO_ROOM", {
  355. room: `manage-station.${stationId}`,
  356. args: ["event:station.theme.updated", { data: { stationId, theme: station.theme } }]
  357. });
  358. StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
  359. room: `home`,
  360. station
  361. }).then(res => {
  362. const { socketsThatCan } = res;
  363. socketsThatCan.forEach(socket => {
  364. socket.dispatch("event:station.theme.updated", { data: { stationId, theme: station.theme } });
  365. });
  366. });
  367. });
  368. }
  369. });
  370. CacheModule.runJob("SUB", {
  371. channel: "station.queueUpdate",
  372. cb: stationId => {
  373. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  374. WSModule.runJob("EMIT_TO_ROOM", {
  375. room: `station.${stationId}`,
  376. args: ["event:station.queue.updated", { data: { queue: station.queue } }]
  377. });
  378. WSModule.runJob("EMIT_TO_ROOM", {
  379. room: `manage-station.${stationId}`,
  380. args: ["event:manageStation.queue.updated", { data: { stationId, queue: station.queue } }]
  381. });
  382. });
  383. }
  384. });
  385. CacheModule.runJob("SUB", {
  386. channel: "station.repositionSongInQueue",
  387. cb: res => {
  388. WSModule.runJob("EMIT_TO_ROOM", {
  389. room: `station.${res.stationId}`,
  390. args: ["event:station.queue.song.repositioned", { data: { song: res.song } }]
  391. });
  392. WSModule.runJob("EMIT_TO_ROOM", {
  393. room: `manage-station.${res.stationId}`,
  394. args: [
  395. "event:manageStation.queue.song.repositioned",
  396. { data: { stationId: res.stationId, song: res.song } }
  397. ]
  398. });
  399. }
  400. });
  401. CacheModule.runJob("SUB", {
  402. channel: "station.voteSkipSong",
  403. cb: stationId => {
  404. WSModule.runJob("EMIT_TO_ROOM", {
  405. room: `station.${stationId}`,
  406. args: ["event:station.voteSkipSong"]
  407. });
  408. }
  409. });
  410. CacheModule.runJob("SUB", {
  411. channel: "station.remove",
  412. cb: stationId => {
  413. WSModule.runJob("EMIT_TO_ROOM", {
  414. room: `station.${stationId}`,
  415. args: ["event:station.deleted"]
  416. });
  417. WSModule.runJob("EMIT_TO_ROOM", {
  418. room: `manage-station.${stationId}`,
  419. args: ["event:station.deleted"]
  420. });
  421. WSModule.runJob("EMIT_TO_ROOM", {
  422. room: `home`,
  423. args: ["event:station.deleted", { data: { stationId } }]
  424. });
  425. WSModule.runJob("EMIT_TO_ROOM", {
  426. room: "admin.stations",
  427. args: ["event:admin.station.deleted", { data: { stationId } }]
  428. });
  429. }
  430. });
  431. CacheModule.runJob("SUB", {
  432. channel: "station.create",
  433. cb: async stationId => {
  434. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  435. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then(async res => {
  436. const { station } = res;
  437. station.userCount = StationsModule.usersPerStationCount[stationId] || 0;
  438. WSModule.runJob("EMIT_TO_ROOM", {
  439. room: "admin.stations",
  440. args: ["event:admin.station.created", { data: { station } }]
  441. });
  442. if (station.privacy === "public")
  443. WSModule.runJob("EMIT_TO_ROOM", {
  444. room: "home",
  445. args: ["event:station.created", { data: { station } }]
  446. });
  447. else {
  448. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
  449. room: "home"
  450. });
  451. sockets.forEach(async socketId => {
  452. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  453. if (!socket) return;
  454. const { session } = socket;
  455. if (session.sessionId) {
  456. CacheModule.runJob("HGET", {
  457. table: "sessions",
  458. key: session.sessionId
  459. }).then(session => {
  460. if (session) {
  461. userModel.findOne({ _id: session.userId }, (err, user) => {
  462. if (user && user.role === "admin")
  463. socket.dispatch("event:station.created", { data: { station } });
  464. else if (user && station.type === "community" && station.owner === session.userId)
  465. socket.dispatch("event:station.created", { data: { station } });
  466. });
  467. }
  468. });
  469. }
  470. });
  471. }
  472. });
  473. }
  474. });
  475. CacheModule.runJob("SUB", {
  476. channel: "station.updated",
  477. cb: async data => {
  478. const stationModel = await DBModule.runJob("GET_MODEL", {
  479. modelName: "station"
  480. });
  481. stationModel.findOne(
  482. { _id: data.stationId },
  483. ["_id", "name", "displayName", "description", "type", "privacy", "owner", "partyMode", "playMode", "theme"],
  484. (err, station) => {
  485. WSModule.runJob("EMIT_TO_ROOMS", {
  486. rooms: ["admin.stations"],
  487. args: ["event:admin.station.updated", { data: { station } }]
  488. });
  489. }
  490. );
  491. }
  492. });
  493. export default {
  494. /**
  495. * Get a list of all the stations
  496. *
  497. * @param {object} session - user session
  498. * @param {Function} cb - callback
  499. */
  500. async index(session, cb) {
  501. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
  502. async.waterfall(
  503. [
  504. // get array of the ids of the user's favorite stations
  505. next => {
  506. if (session.userId)
  507. return userModel.findById(session.userId).select({ favoriteStations: -1 }).exec(next);
  508. return next(null, { favoriteStations: [] });
  509. },
  510. (user, next) => {
  511. const favoriteStations = user ? user.favoriteStations : [];
  512. CacheModule.runJob("HGETALL", { table: "stations" }, this).then(stations =>
  513. next(null, stations, favoriteStations)
  514. );
  515. },
  516. (stations, favorited, next) => {
  517. const filteredStations = [];
  518. async.eachLimit(
  519. stations,
  520. 1,
  521. (station, nextStation) => {
  522. async.waterfall(
  523. [
  524. callback => {
  525. // only relevant if user logged in
  526. if (session.userId) {
  527. if (favorited.indexOf(station._id) !== -1) station.isFavorited = true;
  528. return callback();
  529. }
  530. return callback();
  531. },
  532. callback => {
  533. StationsModule.runJob(
  534. "CAN_USER_VIEW_STATION",
  535. {
  536. station,
  537. userId: session.userId,
  538. hideUnlisted: true
  539. },
  540. this
  541. )
  542. .then(exists => callback(null, exists))
  543. .catch(callback);
  544. }
  545. ],
  546. (err, exists) => {
  547. if (err) return this.log("ERROR", "STATIONS_INDEX", err);
  548. station.userCount = StationsModule.usersPerStationCount[station._id] || 0;
  549. if (exists) filteredStations.push(station);
  550. return nextStation();
  551. }
  552. );
  553. },
  554. () => next(null, filteredStations, favorited)
  555. );
  556. }
  557. ],
  558. async (err, stations, favorited) => {
  559. if (err) {
  560. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  561. this.log("ERROR", "STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  562. return cb({ status: "error", message: err });
  563. }
  564. this.log("SUCCESS", "STATIONS_INDEX", `Indexing stations successful.`, false);
  565. return cb({ status: "success", data: { stations, favorited } });
  566. }
  567. );
  568. },
  569. /**
  570. * Gets stations, used in the admin stations page by the AdvancedTable component
  571. *
  572. * @param {object} session - the session object automatically added by the websocket
  573. * @param page - the page
  574. * @param pageSize - the size per page
  575. * @param properties - the properties to return for each station
  576. * @param sort - the sort object
  577. * @param queries - the queries array
  578. * @param operator - the operator for queries
  579. * @param cb
  580. */
  581. getData: isAdminRequired(async function getSet(session, page, pageSize, properties, sort, queries, operator, cb) {
  582. async.waterfall(
  583. [
  584. next => {
  585. DBModule.runJob(
  586. "GET_DATA",
  587. {
  588. page,
  589. pageSize,
  590. properties,
  591. sort,
  592. queries,
  593. operator,
  594. modelName: "station",
  595. blacklistedProperties: [],
  596. specialProperties: {
  597. owner: [
  598. {
  599. $addFields: {
  600. ownerOID: {
  601. $convert: {
  602. input: "$owner",
  603. to: "objectId",
  604. onError: "unknown",
  605. onNull: "unknown"
  606. }
  607. }
  608. }
  609. },
  610. {
  611. $lookup: {
  612. from: "users",
  613. localField: "ownerOID",
  614. foreignField: "_id",
  615. as: "ownerUser"
  616. }
  617. },
  618. {
  619. $unwind: {
  620. path: "$ownerUser",
  621. preserveNullAndEmptyArrays: true
  622. }
  623. },
  624. {
  625. $addFields: {
  626. ownerUsername: {
  627. $cond: [
  628. { $eq: [{ $type: "$owner" }, "string"] },
  629. { $ifNull: ["$ownerUser.username", "unknown"] },
  630. "none"
  631. ]
  632. }
  633. }
  634. },
  635. {
  636. $project: {
  637. ownerOID: 0,
  638. ownerUser: 0
  639. }
  640. }
  641. ]
  642. },
  643. specialQueries: {
  644. owner: newQuery => ({ $or: [newQuery, { ownerUsername: newQuery.owner }] })
  645. }
  646. },
  647. this
  648. )
  649. .then(response => {
  650. next(null, response);
  651. })
  652. .catch(err => {
  653. next(err);
  654. });
  655. }
  656. ],
  657. async (err, response) => {
  658. if (err) {
  659. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  660. this.log("ERROR", "STATIONS_GET_DATA", `Failed to get data from stations. "${err}"`);
  661. return cb({ status: "error", message: err });
  662. }
  663. this.log("SUCCESS", "STATIONS_GET_DATA", `Got data from stations successfully.`);
  664. return cb({ status: "success", message: "Successfully got data from stations.", data: response });
  665. }
  666. );
  667. }),
  668. /**
  669. * Obtains basic metadata of a station in order to format an activity
  670. *
  671. * @param {object} session - user session
  672. * @param {string} stationId - the station id
  673. * @param {Function} cb - callback
  674. */
  675. getStationForActivity(session, stationId, cb) {
  676. async.waterfall(
  677. [
  678. next => {
  679. StationsModule.runJob("GET_STATION", { stationId }, this)
  680. .then(station => {
  681. next(null, station);
  682. })
  683. .catch(next);
  684. }
  685. ],
  686. async (err, station) => {
  687. if (err) {
  688. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  689. this.log(
  690. "ERROR",
  691. "STATIONS_GET_STATION_FOR_ACTIVITY",
  692. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  693. );
  694. return cb({ status: "error", message: err });
  695. }
  696. this.log(
  697. "SUCCESS",
  698. "STATIONS_GET_STATION_FOR_ACTIVITY",
  699. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  700. );
  701. return cb({
  702. status: "success",
  703. data: {
  704. title: station.displayName,
  705. thumbnail: station.currentSong ? station.currentSong.thumbnail : ""
  706. }
  707. });
  708. }
  709. );
  710. },
  711. /**
  712. * Verifies that a station exists from its name
  713. *
  714. * @param {object} session - user session
  715. * @param {string} stationName - the station name
  716. * @param {Function} cb - callback
  717. */
  718. existsByName(session, stationName, cb) {
  719. async.waterfall(
  720. [
  721. next => {
  722. StationsModule.runJob("GET_STATION_BY_NAME", { stationName }, this)
  723. .then(station => next(null, station))
  724. .catch(next);
  725. },
  726. (station, next) => {
  727. if (!station) return next(null, false);
  728. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  729. .then(exists => next(null, exists))
  730. .catch(next);
  731. }
  732. ],
  733. async (err, exists) => {
  734. if (err) {
  735. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  736. this.log(
  737. "ERROR",
  738. "STATION_EXISTS_BY_NAME",
  739. `Checking if station "${stationName}" exists failed. "${err}"`
  740. );
  741. return cb({ status: "error", message: err });
  742. }
  743. this.log(
  744. "SUCCESS",
  745. "STATION_EXISTS_BY_NAME",
  746. `Station "${stationName}" exists successfully.` /* , false */
  747. );
  748. return cb({ status: "success", data: { exists } });
  749. }
  750. );
  751. },
  752. /**
  753. * Verifies that a station exists from its id
  754. *
  755. * @param {object} session - user session
  756. * @param {string} stationId - the station id
  757. * @param {Function} cb - callback
  758. */
  759. existsById(session, stationId, cb) {
  760. async.waterfall(
  761. [
  762. next => {
  763. StationsModule.runJob("GET_STATION", { stationId }, this)
  764. .then(station => next(null, station))
  765. .catch(next);
  766. },
  767. (station, next) => {
  768. if (!station) return next(null, false);
  769. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  770. .then(exists => next(null, exists))
  771. .catch(next);
  772. }
  773. ],
  774. async (err, exists) => {
  775. if (err) {
  776. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  777. this.log(
  778. "ERROR",
  779. "STATION_EXISTS_BY_ID",
  780. `Checking if station "${stationId}" exists failed. "${err}"`
  781. );
  782. return cb({ status: "error", message: err });
  783. }
  784. this.log(
  785. "SUCCESS",
  786. "STATION_EXISTS_BY_ID",
  787. `Station "${stationId}" exists successfully.` /* , false */
  788. );
  789. return cb({ status: "success", data: { exists } });
  790. }
  791. );
  792. },
  793. /**
  794. * Gets the official playlist for a station
  795. *
  796. * @param {object} session - user session
  797. * @param {string} stationId - the station id
  798. * @param {Function} cb - callback
  799. */
  800. getPlaylist(session, stationId, cb) {
  801. async.waterfall(
  802. [
  803. next => {
  804. StationsModule.runJob("GET_STATION", { stationId }, this)
  805. .then(station => {
  806. next(null, station);
  807. })
  808. .catch(next);
  809. },
  810. (station, next) => {
  811. StationsModule.runJob(
  812. "CAN_USER_VIEW_STATION",
  813. {
  814. station,
  815. userId: session.userId
  816. },
  817. this
  818. )
  819. .then(canView => {
  820. if (canView) return next(null, station);
  821. return next("Insufficient permissions.");
  822. })
  823. .catch(err => next(err));
  824. },
  825. (station, next) => {
  826. if (!station) return next("Station not found.");
  827. if (station.type !== "official") return next("This is not an official station.");
  828. return next();
  829. },
  830. next => {
  831. CacheModule.runJob(
  832. "HGET",
  833. {
  834. table: "officialPlaylists",
  835. key: stationId
  836. },
  837. this
  838. )
  839. .then(playlist => {
  840. next(null, playlist);
  841. })
  842. .catch(next);
  843. },
  844. (playlist, next) => {
  845. if (!playlist) return next("Playlist not found.");
  846. return next(null, playlist);
  847. }
  848. ],
  849. async (err, playlist) => {
  850. if (err) {
  851. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  852. this.log(
  853. "ERROR",
  854. "STATIONS_GET_PLAYLIST",
  855. `Getting playlist for station "${stationId}" failed. "${err}"`
  856. );
  857. return cb({ status: "error", message: err });
  858. }
  859. this.log(
  860. "SUCCESS",
  861. "STATIONS_GET_PLAYLIST",
  862. `Got playlist for station "${stationId}" successfully.`,
  863. false
  864. );
  865. return cb({ status: "success", data: { songs: playlist.songs } });
  866. }
  867. );
  868. },
  869. /**
  870. * Joins the station by its name
  871. *
  872. * @param {object} session - user session
  873. * @param {string} stationIdentifier - the station name or station id
  874. * @param {Function} cb - callback
  875. */
  876. join(session, stationIdentifier, cb) {
  877. async.waterfall(
  878. [
  879. next => {
  880. StationsModule.runJob("GET_STATION_BY_NAME", { stationName: stationIdentifier }, this)
  881. .then(station => next(null, station))
  882. .catch(() =>
  883. // station identifier may be using stationid instead
  884. StationsModule.runJob("GET_STATION", { stationId: stationIdentifier }, this)
  885. .then(station => next(null, station))
  886. .catch(next)
  887. );
  888. },
  889. (station, next) => {
  890. if (!station) return next("Station not found.");
  891. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  892. .then(canView => {
  893. if (!canView) next("Not allowed to join station.");
  894. else next(null, station);
  895. })
  896. .catch(err => next(err));
  897. },
  898. (station, next) => {
  899. WSModule.runJob("SOCKET_JOIN_ROOM", {
  900. socketId: session.socketId,
  901. room: `station.${station._id}`
  902. });
  903. const data = {
  904. _id: station._id,
  905. type: station.type,
  906. currentSong: station.currentSong,
  907. startedAt: station.startedAt,
  908. paused: station.paused,
  909. timePaused: station.timePaused,
  910. pausedAt: station.pausedAt,
  911. description: station.description,
  912. displayName: station.displayName,
  913. name: station.name,
  914. privacy: station.privacy,
  915. locked: station.locked,
  916. partyMode: station.partyMode,
  917. playMode: station.playMode,
  918. owner: station.owner,
  919. includedPlaylists: station.includedPlaylists,
  920. excludedPlaylists: station.excludedPlaylists,
  921. theme: station.theme
  922. };
  923. StationsModule.userList[session.socketId] = station._id;
  924. next(null, data);
  925. },
  926. (data, next) => {
  927. data = JSON.parse(JSON.stringify(data));
  928. data.userCount = StationsModule.usersPerStationCount[data._id] || 0;
  929. data.users = StationsModule.usersPerStation[data._id] || [];
  930. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  931. WSModule.runJob("SOCKET_JOIN_SONG_ROOM", {
  932. socketId: session.socketId,
  933. room: `song.${data.currentSong.youtubeId}`
  934. });
  935. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  936. return SongsModule.runJob(
  937. "GET_SONG_FROM_YOUTUBE_ID",
  938. { youtubeId: data.currentSong.youtubeId },
  939. this
  940. )
  941. .then(response => {
  942. const { song } = response;
  943. if (song) {
  944. data.currentSong.likes = song.likes;
  945. data.currentSong.dislikes = song.dislikes;
  946. } else {
  947. data.currentSong.likes = -1;
  948. data.currentSong.dislikes = -1;
  949. }
  950. })
  951. .catch(() => {
  952. data.currentSong.likes = -1;
  953. data.currentSong.dislikes = -1;
  954. })
  955. .finally(() => next(null, data));
  956. },
  957. (data, next) => {
  958. // only relevant if user logged in
  959. if (session.userId) {
  960. return StationsModule.runJob(
  961. "HAS_USER_FAVORITED_STATION",
  962. {
  963. userId: session.userId,
  964. stationId: data._id
  965. },
  966. this
  967. )
  968. .then(isStationFavorited => {
  969. data.isFavorited = isStationFavorited;
  970. return next(null, data);
  971. })
  972. .catch(err => next(err));
  973. }
  974. return next(null, data);
  975. }
  976. ],
  977. async (err, data) => {
  978. if (err) {
  979. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  980. this.log("ERROR", "STATIONS_JOIN", `Joining station "${stationIdentifier}" failed. "${err}"`);
  981. return cb({ status: "error", message: err });
  982. }
  983. this.log("SUCCESS", "STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  984. return cb({ status: "success", data });
  985. }
  986. );
  987. },
  988. /**
  989. * Gets a station by id
  990. *
  991. * @param {object} session - user session
  992. * @param {string} stationId - the station id
  993. * @param {Function} cb - callback
  994. */
  995. getStationById(session, stationId, cb) {
  996. async.waterfall(
  997. [
  998. next => {
  999. StationsModule.runJob("GET_STATION", { stationId }, this)
  1000. .then(station => {
  1001. next(null, station);
  1002. })
  1003. .catch(next);
  1004. },
  1005. (station, next) => {
  1006. if (!station) return next("Station not found.");
  1007. return StationsModule.runJob(
  1008. "CAN_USER_VIEW_STATION",
  1009. {
  1010. station,
  1011. userId: session.userId
  1012. },
  1013. this
  1014. )
  1015. .then(canView => {
  1016. if (!canView) next("Not allowed to get station.");
  1017. else next(null, station);
  1018. })
  1019. .catch(err => next(err));
  1020. },
  1021. (station, next) => {
  1022. const data = {
  1023. _id: station._id,
  1024. type: station.type,
  1025. description: station.description,
  1026. displayName: station.displayName,
  1027. name: station.name,
  1028. privacy: station.privacy,
  1029. locked: station.locked,
  1030. partyMode: station.partyMode,
  1031. playMode: station.playMode,
  1032. owner: station.owner,
  1033. theme: station.theme,
  1034. paused: station.paused,
  1035. currentSong: station.currentSong
  1036. };
  1037. next(null, data);
  1038. }
  1039. ],
  1040. async (err, data) => {
  1041. if (err) {
  1042. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1043. this.log("ERROR", "GET_STATION_BY_ID", `Getting station "${stationId}" failed. "${err}"`);
  1044. return cb({ status: "error", message: err });
  1045. }
  1046. this.log("SUCCESS", "GET_STATION_BY_ID", `Got station "${stationId}" successfully.`);
  1047. return cb({ status: "success", data: { station: data } });
  1048. }
  1049. );
  1050. },
  1051. getStationIncludedPlaylistsById(session, stationId, cb) {
  1052. async.waterfall(
  1053. [
  1054. next => {
  1055. StationsModule.runJob("GET_STATION", { stationId }, this)
  1056. .then(station => {
  1057. next(null, station);
  1058. })
  1059. .catch(next);
  1060. },
  1061. (station, next) => {
  1062. if (!station) return next("Station not found.");
  1063. return StationsModule.runJob(
  1064. "CAN_USER_VIEW_STATION",
  1065. {
  1066. station,
  1067. userId: session.userId
  1068. },
  1069. this
  1070. )
  1071. .then(canView => {
  1072. if (!canView) next("Not allowed to get station.");
  1073. else next(null, station);
  1074. })
  1075. .catch(err => next(err));
  1076. },
  1077. (station, next) => {
  1078. const playlists = [];
  1079. async.eachLimit(
  1080. station.includedPlaylists,
  1081. 1,
  1082. (playlistId, next) => {
  1083. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  1084. .then(playlist => {
  1085. playlists.push(playlist);
  1086. next();
  1087. })
  1088. .catch(() => {
  1089. playlists.push(null);
  1090. next();
  1091. });
  1092. },
  1093. err => {
  1094. next(err, playlists);
  1095. }
  1096. );
  1097. }
  1098. ],
  1099. async (err, playlists) => {
  1100. if (err) {
  1101. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1102. this.log(
  1103. "ERROR",
  1104. "GET_STATION_INCLUDED_PLAYLISTS_BY_ID",
  1105. `Getting station "${stationId}"'s included playlists failed. "${err}"`
  1106. );
  1107. return cb({ status: "error", message: err });
  1108. }
  1109. this.log(
  1110. "SUCCESS",
  1111. "GET_STATION_INCLUDED_PLAYLISTS_BY_ID",
  1112. `Got station "${stationId}"'s included playlists successfully.`
  1113. );
  1114. return cb({ status: "success", data: { playlists } });
  1115. }
  1116. );
  1117. },
  1118. getStationExcludedPlaylistsById(session, stationId, cb) {
  1119. async.waterfall(
  1120. [
  1121. next => {
  1122. StationsModule.runJob("GET_STATION", { stationId }, this)
  1123. .then(station => {
  1124. next(null, station);
  1125. })
  1126. .catch(next);
  1127. },
  1128. (station, next) => {
  1129. if (!station) return next("Station not found.");
  1130. return StationsModule.runJob(
  1131. "CAN_USER_VIEW_STATION",
  1132. {
  1133. station,
  1134. userId: session.userId
  1135. },
  1136. this
  1137. )
  1138. .then(canView => {
  1139. if (!canView) next("Not allowed to get station.");
  1140. else next(null, station);
  1141. })
  1142. .catch(err => next(err));
  1143. },
  1144. (station, next) => {
  1145. const playlists = [];
  1146. async.eachLimit(
  1147. station.excludedPlaylists,
  1148. 1,
  1149. (playlistId, next) => {
  1150. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  1151. .then(playlist => {
  1152. playlists.push(playlist);
  1153. next();
  1154. })
  1155. .catch(() => {
  1156. playlists.push(null);
  1157. next();
  1158. });
  1159. },
  1160. err => {
  1161. next(err, playlists);
  1162. }
  1163. );
  1164. }
  1165. ],
  1166. async (err, playlists) => {
  1167. if (err) {
  1168. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1169. this.log(
  1170. "ERROR",
  1171. "GET_STATION_EXCLUDED_PLAYLISTS_BY_ID",
  1172. `Getting station "${stationId}"'s excluded playlists failed. "${err}"`
  1173. );
  1174. return cb({ status: "error", message: err });
  1175. }
  1176. this.log(
  1177. "SUCCESS",
  1178. "GET_STATION_EXCLUDED_PLAYLISTS_BY_ID",
  1179. `Got station "${stationId}"'s excluded playlists successfully.`
  1180. );
  1181. return cb({ status: "success", data: { playlists } });
  1182. }
  1183. );
  1184. },
  1185. /**
  1186. * Toggles if a station is locked
  1187. *
  1188. * @param session
  1189. * @param stationId - the station id
  1190. * @param cb
  1191. */
  1192. toggleLock: isOwnerRequired(async function toggleLock(session, stationId, cb) {
  1193. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1194. async.waterfall(
  1195. [
  1196. next => {
  1197. StationsModule.runJob("GET_STATION", { stationId }, this)
  1198. .then(station => {
  1199. next(null, station);
  1200. })
  1201. .catch(next);
  1202. },
  1203. (station, next) => {
  1204. stationModel.updateOne({ _id: stationId }, { $set: { locked: !station.locked } }, next);
  1205. },
  1206. (res, next) => {
  1207. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1208. .then(station => {
  1209. next(null, station);
  1210. })
  1211. .catch(next);
  1212. }
  1213. ],
  1214. async (err, station) => {
  1215. if (err) {
  1216. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1217. this.log(
  1218. "ERROR",
  1219. "STATIONS_UPDATE_LOCKED_STATUS",
  1220. `Toggling the queue lock for station "${stationId}" failed. "${err}"`
  1221. );
  1222. return cb({ status: "error", message: err });
  1223. }
  1224. this.log(
  1225. "SUCCESS",
  1226. "STATIONS_UPDATE_LOCKED_STATUS",
  1227. `Toggled the queue lock for station "${stationId}" successfully to "${station.locked}".`
  1228. );
  1229. CacheModule.runJob("PUB", {
  1230. channel: "station.queueLockToggled",
  1231. value: {
  1232. stationId,
  1233. locked: station.locked
  1234. }
  1235. });
  1236. CacheModule.runJob("PUB", {
  1237. channel: "station.updated",
  1238. value: { stationId }
  1239. });
  1240. return cb({ status: "success", data: { locked: station.locked } });
  1241. }
  1242. );
  1243. }),
  1244. /**
  1245. * Votes to skip a station
  1246. *
  1247. * @param session
  1248. * @param stationId - the station id
  1249. * @param cb
  1250. */
  1251. voteSkip: isLoginRequired(async function voteSkip(session, stationId, cb) {
  1252. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1253. let skipVotes = 0;
  1254. let shouldSkip = false;
  1255. async.waterfall(
  1256. [
  1257. next => {
  1258. StationsModule.runJob("GET_STATION", { stationId }, this)
  1259. .then(station => next(null, station))
  1260. .catch(next);
  1261. },
  1262. (station, next) => {
  1263. if (!station) return next("Station not found.");
  1264. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  1265. .then(canView => {
  1266. if (canView) return next(null, station);
  1267. return next("Insufficient permissions.");
  1268. })
  1269. .catch(err => next(err));
  1270. },
  1271. (station, next) => {
  1272. if (!station.currentSong) return next("There is currently no song to skip.");
  1273. if (station.currentSong.skipVotes.indexOf(session.userId) !== -1)
  1274. return next("You have already voted to skip this song.");
  1275. return next(null, station);
  1276. },
  1277. (station, next) => {
  1278. stationModel.updateOne(
  1279. { _id: stationId },
  1280. { $push: { "currentSong.skipVotes": session.userId } },
  1281. next
  1282. );
  1283. },
  1284. (res, next) => {
  1285. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1286. .then(station => {
  1287. next(null, station);
  1288. })
  1289. .catch(next);
  1290. },
  1291. (station, next) => {
  1292. if (!station) return next("Station not found.");
  1293. return next(null, station);
  1294. },
  1295. (station, next) => {
  1296. skipVotes = station.currentSong.skipVotes.length;
  1297. WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: `station.${stationId}` }, this)
  1298. .then(sockets => next(null, sockets))
  1299. .catch(next);
  1300. },
  1301. (sockets, next) => {
  1302. if (sockets.length <= skipVotes) {
  1303. shouldSkip = true;
  1304. return next();
  1305. }
  1306. const users = [];
  1307. return async.each(
  1308. sockets,
  1309. (socketId, next) => {
  1310. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this)
  1311. .then(socket => {
  1312. if (socket && socket.session && socket.session.userId) {
  1313. if (!users.includes(socket.session.userId)) users.push(socket.session.userId);
  1314. }
  1315. return next();
  1316. })
  1317. .catch(next);
  1318. },
  1319. err => {
  1320. if (err) return next(err);
  1321. if (users.length <= skipVotes) shouldSkip = true;
  1322. return next();
  1323. }
  1324. );
  1325. }
  1326. ],
  1327. async err => {
  1328. if (err) {
  1329. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1330. this.log("ERROR", "STATIONS_VOTE_SKIP", `Vote skipping station "${stationId}" failed. "${err}"`);
  1331. return cb({ status: "error", message: err });
  1332. }
  1333. this.log("SUCCESS", "STATIONS_VOTE_SKIP", `Vote skipping "${stationId}" successful.`);
  1334. CacheModule.runJob("PUB", {
  1335. channel: "station.voteSkipSong",
  1336. value: stationId
  1337. });
  1338. if (shouldSkip) {
  1339. StationsModule.runJob("SKIP_STATION", { stationId, natural: false });
  1340. }
  1341. return cb({
  1342. status: "success",
  1343. message: "Successfully voted to skip the song."
  1344. });
  1345. }
  1346. );
  1347. }),
  1348. /**
  1349. * Force skips a station
  1350. *
  1351. * @param session
  1352. * @param stationId - the station id
  1353. * @param cb
  1354. */
  1355. forceSkip: isOwnerRequired(function forceSkip(session, stationId, cb) {
  1356. async.waterfall(
  1357. [
  1358. next => {
  1359. StationsModule.runJob("GET_STATION", { stationId }, this)
  1360. .then(station => {
  1361. next(null, station);
  1362. })
  1363. .catch(next);
  1364. },
  1365. (station, next) => {
  1366. if (!station) return next("Station not found.");
  1367. return next();
  1368. }
  1369. ],
  1370. async err => {
  1371. if (err) {
  1372. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1373. this.log("ERROR", "STATIONS_FORCE_SKIP", `Force skipping station "${stationId}" failed. "${err}"`);
  1374. return cb({ status: "error", message: err });
  1375. }
  1376. StationsModule.runJob("SKIP_STATION", { stationId, natural: false });
  1377. this.log("SUCCESS", "STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  1378. return cb({
  1379. status: "success",
  1380. message: "Successfully skipped station."
  1381. });
  1382. }
  1383. );
  1384. }),
  1385. /**
  1386. * Leaves the user's current station
  1387. *
  1388. * @param {object} session - user session
  1389. * @param {string} stationId - id of station to leave
  1390. * @param {Function} cb - callback
  1391. */
  1392. leave(session, stationId, cb) {
  1393. async.waterfall(
  1394. [
  1395. next => {
  1396. StationsModule.runJob("GET_STATION", { stationId }, this)
  1397. .then(station => next(null, station))
  1398. .catch(next);
  1399. },
  1400. (station, next) => {
  1401. if (!station) return next("Station not found.");
  1402. return next();
  1403. }
  1404. ],
  1405. async (err, userCount) => {
  1406. if (err) {
  1407. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1408. this.log("ERROR", "STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  1409. return cb({ status: "error", message: err });
  1410. }
  1411. this.log("SUCCESS", "STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  1412. WSModule.runJob("SOCKET_LEAVE_ROOM", { socketId: session.socketId, room: `station.${stationId}` });
  1413. WSModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", { sockets: [session.socketId] });
  1414. delete StationsModule.userList[session.socketId];
  1415. return cb({
  1416. status: "success",
  1417. message: "Successfully left station.",
  1418. data: { userCount }
  1419. });
  1420. }
  1421. );
  1422. },
  1423. /**
  1424. * Updates a station's name
  1425. *
  1426. * @param session
  1427. * @param stationId - the station id
  1428. * @param newName - the new station name
  1429. * @param cb
  1430. */
  1431. updateName: isOwnerRequired(async function updateName(session, stationId, newName, cb) {
  1432. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1433. async.waterfall(
  1434. [
  1435. next => {
  1436. stationModel.updateOne(
  1437. { _id: stationId },
  1438. { $set: { name: newName } },
  1439. { runValidators: true },
  1440. next
  1441. );
  1442. },
  1443. (res, next) => {
  1444. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1445. .then(station => next(null, station))
  1446. .catch(next);
  1447. }
  1448. ],
  1449. async (err, station) => {
  1450. if (err) {
  1451. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1452. this.log(
  1453. "ERROR",
  1454. "STATIONS_UPDATE_NAME",
  1455. `Updating station "${stationId}" name to "${newName}" failed. "${err}"`
  1456. );
  1457. return cb({ status: "error", message: err });
  1458. }
  1459. this.log(
  1460. "SUCCESS",
  1461. "STATIONS_UPDATE_NAME",
  1462. `Updated station "${stationId}" name to "${newName}" successfully.`
  1463. );
  1464. CacheModule.runJob("PUB", {
  1465. channel: "station.nameUpdate",
  1466. value: { stationId, name: newName }
  1467. });
  1468. CacheModule.runJob("PUB", {
  1469. channel: "station.updated",
  1470. value: { stationId }
  1471. });
  1472. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1473. userId: session.userId,
  1474. type: "station__edit_name",
  1475. payload: {
  1476. message: `Changed name of station <stationId>${station.displayName}</stationId> to ${newName}`,
  1477. stationId
  1478. }
  1479. });
  1480. return cb({
  1481. status: "success",
  1482. message: "Successfully updated the name."
  1483. });
  1484. }
  1485. );
  1486. }),
  1487. /**
  1488. * Updates a station's display name
  1489. *
  1490. * @param session
  1491. * @param stationId - the station id
  1492. * @param newDisplayName - the new station display name
  1493. * @param cb
  1494. */
  1495. updateDisplayName: isOwnerRequired(async function updateDisplayName(session, stationId, newDisplayName, cb) {
  1496. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1497. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  1498. async.waterfall(
  1499. [
  1500. next => {
  1501. stationModel.updateOne(
  1502. { _id: stationId },
  1503. { $set: { displayName: newDisplayName } },
  1504. { runValidators: true },
  1505. next
  1506. );
  1507. },
  1508. (res, next) => {
  1509. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1510. .then(station => next(null, station))
  1511. .catch(next);
  1512. },
  1513. (station, next) => {
  1514. playlistModel.updateOne(
  1515. { _id: station.playlist },
  1516. { $set: { displayName: `Station - ${station.displayName}` } },
  1517. err => {
  1518. next(err, station);
  1519. }
  1520. );
  1521. }
  1522. ],
  1523. async err => {
  1524. if (err) {
  1525. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1526. this.log(
  1527. "ERROR",
  1528. "STATIONS_UPDATE_DISPLAY_NAME",
  1529. `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`
  1530. );
  1531. return cb({ status: "error", message: err });
  1532. }
  1533. this.log(
  1534. "SUCCESS",
  1535. "STATIONS_UPDATE_DISPLAY_NAME",
  1536. `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`
  1537. );
  1538. CacheModule.runJob("PUB", {
  1539. channel: "station.displayNameUpdate",
  1540. value: { stationId, displayName: newDisplayName }
  1541. });
  1542. CacheModule.runJob("PUB", {
  1543. channel: "station.updated",
  1544. value: { stationId }
  1545. });
  1546. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1547. userId: session.userId,
  1548. type: "station__edit_display_name",
  1549. payload: {
  1550. message: `Changed display name of station <stationId>${newDisplayName}</stationId>`,
  1551. stationId
  1552. }
  1553. });
  1554. return cb({
  1555. status: "success",
  1556. message: "Successfully updated the display name."
  1557. });
  1558. }
  1559. );
  1560. }),
  1561. /**
  1562. * Updates a station's description
  1563. *
  1564. * @param session
  1565. * @param stationId - the station id
  1566. * @param newDescription - the new station description
  1567. * @param cb
  1568. */
  1569. updateDescription: isOwnerRequired(async function updateDescription(session, stationId, newDescription, cb) {
  1570. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1571. async.waterfall(
  1572. [
  1573. next => {
  1574. stationModel.updateOne(
  1575. { _id: stationId },
  1576. { $set: { description: newDescription } },
  1577. { runValidators: true },
  1578. next
  1579. );
  1580. },
  1581. (res, next) => {
  1582. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1583. .then(station => next(null, station))
  1584. .catch(next);
  1585. }
  1586. ],
  1587. async (err, station) => {
  1588. if (err) {
  1589. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1590. this.log(
  1591. "ERROR",
  1592. "STATIONS_UPDATE_DESCRIPTION",
  1593. `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`
  1594. );
  1595. return cb({ status: "error", message: err });
  1596. }
  1597. this.log(
  1598. "SUCCESS",
  1599. "STATIONS_UPDATE_DESCRIPTION",
  1600. `Updated station "${stationId}" description to "${newDescription}" successfully.`
  1601. );
  1602. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1603. userId: session.userId,
  1604. type: "station__edit_description",
  1605. payload: {
  1606. message: `Changed description of station <stationId>${station.displayName}</stationId> to ${newDescription}`,
  1607. stationId
  1608. }
  1609. });
  1610. CacheModule.runJob("PUB", {
  1611. channel: "station.descriptionUpdate",
  1612. value: { stationId, description: newDescription }
  1613. });
  1614. CacheModule.runJob("PUB", {
  1615. channel: "station.updated",
  1616. value: { stationId }
  1617. });
  1618. return cb({
  1619. status: "success",
  1620. message: "Successfully updated the description."
  1621. });
  1622. }
  1623. );
  1624. }),
  1625. /**
  1626. * Updates a station's privacy
  1627. *
  1628. * @param session
  1629. * @param stationId - the station id
  1630. * @param newPrivacy - the new station privacy
  1631. * @param cb
  1632. */
  1633. updatePrivacy: isOwnerRequired(async function updatePrivacy(session, stationId, newPrivacy, cb) {
  1634. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  1635. let previousPrivacy = null;
  1636. async.waterfall(
  1637. [
  1638. next => {
  1639. stationModel.findOne({ _id: stationId }, next);
  1640. },
  1641. (station, next) => {
  1642. if (!station) next("No station found.");
  1643. else {
  1644. previousPrivacy = station.privacy;
  1645. next();
  1646. }
  1647. },
  1648. next => {
  1649. stationModel.updateOne(
  1650. { _id: stationId },
  1651. { $set: { privacy: newPrivacy } },
  1652. { runValidators: true },
  1653. next
  1654. );
  1655. },
  1656. (res, next) => {
  1657. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1658. .then(station => next(null, station))
  1659. .catch(next);
  1660. }
  1661. ],
  1662. async (err, station) => {
  1663. if (err) {
  1664. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1665. this.log(
  1666. "ERROR",
  1667. "STATIONS_UPDATE_PRIVACY",
  1668. `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`
  1669. );
  1670. return cb({ status: "error", message: err });
  1671. }
  1672. this.log(
  1673. "SUCCESS",
  1674. "STATIONS_UPDATE_PRIVACY",
  1675. `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`
  1676. );
  1677. CacheModule.runJob("PUB", {
  1678. channel: "station.privacyUpdate",
  1679. value: { stationId, previousPrivacy }
  1680. });
  1681. CacheModule.runJob("PUB", {
  1682. channel: "station.updated",
  1683. value: { stationId }
  1684. });
  1685. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1686. userId: session.userId,
  1687. type: "station__edit_privacy",
  1688. payload: {
  1689. message: `Changed privacy of station <stationId>${station.displayName}</stationId> to ${newPrivacy}`,
  1690. stationId
  1691. }
  1692. });
  1693. return cb({
  1694. status: "success",
  1695. message: "Successfully updated the privacy."
  1696. });
  1697. }
  1698. );
  1699. }),
  1700. /**
  1701. * Updates a station's genres
  1702. *
  1703. * @param session
  1704. * @param stationId - the station id
  1705. * @param newGenres - the new station genres
  1706. * @param cb
  1707. */
  1708. updateGenres: isOwnerRequired(async function updateGenres(session, stationId, newGenres, cb) {
  1709. async.waterfall(
  1710. [
  1711. next => {
  1712. StationsModule.runJob("GET_STATION", { stationId }, this)
  1713. .then(station => {
  1714. next(null, station);
  1715. })
  1716. .catch(next);
  1717. },
  1718. (station, next) => {
  1719. const playlists = [];
  1720. async.eachLimit(
  1721. newGenres,
  1722. 1,
  1723. (genre, next) => {
  1724. PlaylistsModule.runJob("GET_GENRE_PLAYLIST", { genre, includeSongs: false }, this)
  1725. .then(response => {
  1726. playlists.push(response.playlist);
  1727. next();
  1728. })
  1729. .catch(err => {
  1730. if (err.message === "Playlist not found")
  1731. next(
  1732. `The genre playlist for "${genre}" was not found. Please ensure that this genre playlist exists.`
  1733. );
  1734. else next(err);
  1735. });
  1736. },
  1737. err => {
  1738. next(
  1739. err,
  1740. station,
  1741. playlists.map(playlist => playlist._id.toString())
  1742. );
  1743. }
  1744. );
  1745. },
  1746. (station, playlists, next) => {
  1747. const playlistsToRemoveFromExcluded = playlists.filter(
  1748. playlistId => station.excludedPlaylists.indexOf(playlistId) !== -1
  1749. );
  1750. console.log(
  1751. `playlistsToRemoveFromExcluded: ${playlistsToRemoveFromExcluded.length}`,
  1752. playlistsToRemoveFromExcluded
  1753. );
  1754. async.eachLimit(
  1755. playlistsToRemoveFromExcluded,
  1756. 1,
  1757. (playlistId, next) => {
  1758. StationsModule.runJob("REMOVE_EXCLUDED_PLAYLIST", { stationId, playlistId }, this)
  1759. .then(() => {
  1760. next();
  1761. })
  1762. .catch(next);
  1763. },
  1764. err => {
  1765. next(err, station, playlists);
  1766. }
  1767. );
  1768. },
  1769. (station, playlists, next) => {
  1770. const playlistsToRemoveFromIncluded = station.includedPlaylists.filter(
  1771. playlistId => playlists.indexOf(playlistId) === -1
  1772. );
  1773. console.log(
  1774. `playlistsToRemoveFromIncluded: ${playlistsToRemoveFromIncluded.length}`,
  1775. playlistsToRemoveFromIncluded
  1776. );
  1777. async.eachLimit(
  1778. playlistsToRemoveFromIncluded,
  1779. 1,
  1780. (playlistId, next) => {
  1781. StationsModule.runJob("REMOVE_INCLUDED_PLAYLIST", { stationId, playlistId }, this)
  1782. .then(() => {
  1783. next();
  1784. })
  1785. .catch(next);
  1786. },
  1787. err => {
  1788. next(err, station, playlists);
  1789. }
  1790. );
  1791. },
  1792. (station, playlists, next) => {
  1793. const playlistsToAddToIncluded = playlists.filter(
  1794. playlistId => station.includedPlaylists.indexOf(playlistId) === -1
  1795. );
  1796. console.log(
  1797. `playlistsToAddToIncluded: ${playlistsToAddToIncluded.length}`,
  1798. playlistsToAddToIncluded
  1799. );
  1800. async.eachLimit(
  1801. playlistsToAddToIncluded,
  1802. 1,
  1803. (playlistId, next) => {
  1804. StationsModule.runJob("INCLUDE_PLAYLIST", { stationId, playlistId }, this)
  1805. .then(() => {
  1806. next();
  1807. })
  1808. .catch(next);
  1809. },
  1810. err => {
  1811. next(err);
  1812. }
  1813. );
  1814. },
  1815. next => {
  1816. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  1817. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1818. .then(station => next(null, station))
  1819. .catch(next);
  1820. }
  1821. ],
  1822. async (err, station) => {
  1823. if (err) {
  1824. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  1825. this.log(
  1826. "ERROR",
  1827. "STATIONS_UPDATE_GENRES",
  1828. `Updating station "${stationId}" genres to "${newGenres}" failed. "${err}"`
  1829. );
  1830. return cb({ status: "error", message: err });
  1831. }
  1832. this.log(
  1833. "SUCCESS",
  1834. "STATIONS_UPDATE_GENRES",
  1835. `Updated station "${stationId}" genres to "${newGenres}" successfully.`
  1836. );
  1837. if (newGenres.length > 0) {
  1838. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1839. userId: session.userId,
  1840. type: "station__edit_genres",
  1841. payload: {
  1842. message: `Updated genres of station <stationId>${station.displayName}</stationId> to
  1843. ${newGenres.join(", ")}`,
  1844. stationId
  1845. }
  1846. });
  1847. } else {
  1848. ActivitiesModule.runJob("ADD_ACTIVITY", {
  1849. userId: session.userId,
  1850. type: "station__edit_genres",
  1851. payload: {
  1852. message: `Removed all genres of station <stationId>${station.displayName}</stationId>`,
  1853. stationId
  1854. }
  1855. });
  1856. }
  1857. return cb({
  1858. status: "success",
  1859. message: "Successfully updated the genres."
  1860. });
  1861. }
  1862. );
  1863. }),
  1864. /**
  1865. * Updates a station's blacklisted genres
  1866. *
  1867. * @param session
  1868. * @param stationId - the station id
  1869. * @param newBlacklistedGenres - the new station blacklisted genres
  1870. * @param cb
  1871. */
  1872. updateBlacklistedGenres: isOwnerRequired(async function updateBlacklistedGenres(
  1873. session,
  1874. stationId,
  1875. newBlacklistedGenres,
  1876. cb
  1877. ) {
  1878. async.waterfall(
  1879. [
  1880. next => {
  1881. StationsModule.runJob("GET_STATION", { stationId }, this)
  1882. .then(station => {
  1883. next(null, station);
  1884. })
  1885. .catch(next);
  1886. },
  1887. (station, next) => {
  1888. const playlists = [];
  1889. async.eachLimit(
  1890. newBlacklistedGenres,
  1891. 1,
  1892. (genre, next) => {
  1893. PlaylistsModule.runJob("GET_GENRE_PLAYLIST", { genre, includeSongs: false }, this)
  1894. .then(response => {
  1895. playlists.push(response.playlist);
  1896. next();
  1897. })
  1898. .catch(err => {
  1899. if (err.message === "Playlist not found")
  1900. next(
  1901. `The genre playlist for "${genre}" was not found. Please ensure that this genre playlist exists.`
  1902. );
  1903. else next(err);
  1904. });
  1905. },
  1906. err => {
  1907. next(
  1908. err,
  1909. station,
  1910. playlists.map(playlist => playlist._id.toString())
  1911. );
  1912. }
  1913. );
  1914. },
  1915. (station, playlists, next) => {
  1916. const playlistsToRemoveFromIncluded = playlists.filter(
  1917. playlistId => station.includedPlaylists.indexOf(playlistId) !== -1
  1918. );
  1919. console.log(
  1920. `playlistsToRemoveFromIncluded: ${playlistsToRemoveFromIncluded.length}`,
  1921. playlistsToRemoveFromIncluded
  1922. );
  1923. async.eachLimit(
  1924. playlistsToRemoveFromIncluded,
  1925. 1,
  1926. (playlistId, next) => {
  1927. StationsModule.runJob("REMOVE_INCLUDED_PLAYLIST", { stationId, playlistId }, this)
  1928. .then(() => {
  1929. next();
  1930. })
  1931. .catch(next);
  1932. },
  1933. err => {
  1934. next(err, station, playlists);
  1935. }
  1936. );
  1937. },
  1938. (station, playlists, next) => {
  1939. const playlistsToRemoveFromExcluded = station.excludedPlaylists.filter(
  1940. playlistId => playlists.indexOf(playlistId) === -1
  1941. );
  1942. console.log(
  1943. `playlistsToRemoveFromExcluded: ${playlistsToRemoveFromExcluded.length}`,
  1944. playlistsToRemoveFromExcluded
  1945. );
  1946. async.eachLimit(
  1947. playlistsToRemoveFromExcluded,
  1948. 1,
  1949. (playlistId, next) => {
  1950. StationsModule.runJob("REMOVE_EXCLUDED_PLAYLIST", { stationId, playlistId }, this)
  1951. .then(() => {
  1952. next();
  1953. })
  1954. .catch(next);
  1955. },
  1956. err => {
  1957. next(err, station, playlists);
  1958. }
  1959. );
  1960. },
  1961. (station, playlists, next) => {
  1962. const playlistsToAddToExcluded = playlists.filter(
  1963. playlistId => station.excludedPlaylists.indexOf(playlistId) === -1
  1964. );
  1965. console.log(
  1966. `playlistsToAddToExcluded: ${playlistsToAddToExcluded.length}`,
  1967. playlistsToAddToExcluded
  1968. );
  1969. async.eachLimit(
  1970. playlistsToAddToExcluded,
  1971. 1,
  1972. (playlistId, next) => {
  1973. StationsModule.runJob("EXCLUDE_PLAYLIST", { stationId, playlistId }, this)
  1974. .then(() => {
  1975. next();
  1976. })
  1977. .catch(next);
  1978. },
  1979. err => {
  1980. next(err);
  1981. }
  1982. );
  1983. },
  1984. next => {
  1985. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  1986. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  1987. .then(station => next(null, station))
  1988. .catch(next);
  1989. }
  1990. ],
  1991. async (err, station) => {
  1992. if (err) {
  1993. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1994. this.log(
  1995. "ERROR",
  1996. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1997. `Updating station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" failed. "${err}"`
  1998. );
  1999. return cb({ status: "error", message: err });
  2000. }
  2001. this.log(
  2002. "SUCCESS",
  2003. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  2004. `Updated station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" successfully.`
  2005. );
  2006. if (newBlacklistedGenres.length > 0) {
  2007. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2008. userId: session.userId,
  2009. type: "station__edit_blacklisted_genres",
  2010. payload: {
  2011. message: `Updated blacklisted genres of station <stationId>${
  2012. station.displayName
  2013. }</stationId> to ${newBlacklistedGenres.join(", ")}`,
  2014. stationId
  2015. }
  2016. });
  2017. } else {
  2018. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2019. userId: session.userId,
  2020. type: "station__edit_blacklisted_genres",
  2021. payload: {
  2022. message: `Removed all blacklisted genres of station <stationId>${station.displayName}</stationId>`,
  2023. stationId
  2024. }
  2025. });
  2026. }
  2027. return cb({
  2028. status: "success",
  2029. message: "Successfully updated the blacklisted genres."
  2030. });
  2031. }
  2032. );
  2033. }),
  2034. /**
  2035. * Updates a station's party mode
  2036. *
  2037. * @param session
  2038. * @param stationId - the station id
  2039. * @param newPartyMode - the new station party mode
  2040. * @param cb
  2041. */
  2042. updatePartyMode: isOwnerRequired(async function updatePartyMode(session, stationId, newPartyMode, cb) {
  2043. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  2044. async.waterfall(
  2045. [
  2046. next => {
  2047. StationsModule.runJob("GET_STATION", { stationId }, this)
  2048. .then(station => next(null, station))
  2049. .catch(next);
  2050. },
  2051. (station, next) => {
  2052. if (!station) return next("Station not found.");
  2053. if (station.partyMode === newPartyMode)
  2054. return next(`The party mode was already ${newPartyMode ? "enabled." : "disabled."}`);
  2055. return stationModel.updateOne(
  2056. { _id: stationId },
  2057. { $set: { partyMode: newPartyMode, queue: [] } },
  2058. { runValidators: true },
  2059. next
  2060. );
  2061. },
  2062. (res, next) => {
  2063. CacheModule.runJob("PUB", {
  2064. channel: "station.queueUpdate",
  2065. value: stationId
  2066. })
  2067. .then(() => {})
  2068. .catch(next);
  2069. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2070. .then(station => next(null, station))
  2071. .catch(next);
  2072. }
  2073. ],
  2074. async err => {
  2075. if (err) {
  2076. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2077. this.log(
  2078. "ERROR",
  2079. "STATIONS_UPDATE_PARTY_MODE",
  2080. `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`
  2081. );
  2082. return cb({ status: "error", message: err });
  2083. }
  2084. this.log(
  2085. "SUCCESS",
  2086. "STATIONS_UPDATE_PARTY_MODE",
  2087. `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`
  2088. );
  2089. CacheModule.runJob("PUB", {
  2090. channel: "station.updatePartyMode",
  2091. value: {
  2092. stationId,
  2093. partyMode: newPartyMode
  2094. }
  2095. });
  2096. CacheModule.runJob("PUB", {
  2097. channel: "station.updated",
  2098. value: { stationId }
  2099. });
  2100. StationsModule.runJob("SKIP_STATION", { stationId, natural: false });
  2101. return cb({
  2102. status: "success",
  2103. message: "Successfully updated the party mode."
  2104. });
  2105. }
  2106. );
  2107. }),
  2108. /**
  2109. * Updates a station's play mode
  2110. *
  2111. * @param session
  2112. * @param stationId - the station id
  2113. * @param newPlayMode - the new station play mode
  2114. * @param cb
  2115. */
  2116. updatePlayMode: isOwnerRequired(async function updatePartyMode(session, stationId, newPlayMode, cb) {
  2117. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  2118. async.waterfall(
  2119. [
  2120. next => {
  2121. StationsModule.runJob("GET_STATION", { stationId }, this)
  2122. .then(station => {
  2123. next(null, station);
  2124. })
  2125. .catch(next);
  2126. },
  2127. (station, next) => {
  2128. if (!station) return next("Station not found.");
  2129. if (station.newPlayMode === newPlayMode) return next(`The play mode was already ${newPlayMode}`);
  2130. return stationModel.updateOne(
  2131. { _id: stationId },
  2132. { $set: { playMode: newPlayMode, queue: [] } },
  2133. { runValidators: true },
  2134. next
  2135. );
  2136. },
  2137. (res, next) => {
  2138. CacheModule.runJob("PUB", {
  2139. channel: "station.queueUpdate",
  2140. value: stationId
  2141. })
  2142. .then()
  2143. .catch();
  2144. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2145. .then(station => {
  2146. next(null, station);
  2147. })
  2148. .catch(next);
  2149. }
  2150. ],
  2151. async err => {
  2152. if (err) {
  2153. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2154. this.log(
  2155. "ERROR",
  2156. "STATIONS_UPDATE_PLAY_MODE",
  2157. `Updating station "${stationId}" play mode to "${newPlayMode}" failed. "${err}"`
  2158. );
  2159. return cb({ status: "error", message: err });
  2160. }
  2161. this.log(
  2162. "SUCCESS",
  2163. "STATIONS_UPDATE_PLAY_MODE",
  2164. `Updated station "${stationId}" play mode to "${newPlayMode}" successfully.`
  2165. );
  2166. CacheModule.runJob("PUB", {
  2167. channel: "station.newPlayMode",
  2168. value: {
  2169. stationId,
  2170. playMode: newPlayMode
  2171. }
  2172. });
  2173. CacheModule.runJob("PUB", {
  2174. channel: "station.updated",
  2175. value: { stationId }
  2176. });
  2177. StationsModule.runJob("SKIP_STATION", { stationId, natural: false });
  2178. return cb({
  2179. status: "success",
  2180. message: "Successfully updated the play mode."
  2181. });
  2182. }
  2183. );
  2184. }),
  2185. /**
  2186. * Updates a station's theme
  2187. *
  2188. * @param session
  2189. * @param stationId - the station id
  2190. * @param newTheme - the new station theme
  2191. * @param cb
  2192. */
  2193. updateTheme: isOwnerRequired(async function updateTheme(session, stationId, newTheme, cb) {
  2194. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  2195. async.waterfall(
  2196. [
  2197. next => {
  2198. StationsModule.runJob("GET_STATION", { stationId }, this)
  2199. .then(station => {
  2200. next(null, station);
  2201. })
  2202. .catch(next);
  2203. },
  2204. (station, next) => {
  2205. if (!station) return next("Station not found.");
  2206. if (station.theme === newTheme) return next("No change in theme.");
  2207. return stationModel.updateOne(
  2208. { _id: stationId },
  2209. { $set: { theme: newTheme } },
  2210. { runValidators: true },
  2211. next
  2212. );
  2213. },
  2214. (res, next) => {
  2215. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2216. .then(station => next(null, station))
  2217. .catch(next);
  2218. }
  2219. ],
  2220. async (err, station) => {
  2221. if (err) {
  2222. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2223. this.log(
  2224. "ERROR",
  2225. "STATIONS_UPDATE_THEME",
  2226. `Updating station "${stationId}" theme to "${newTheme}" failed. "${err}"`
  2227. );
  2228. return cb({ status: "error", message: err });
  2229. }
  2230. this.log(
  2231. "SUCCESS",
  2232. "STATIONS_UPDATE_THEME",
  2233. `Updated station "${stationId}" theme to "${newTheme}" successfully.`
  2234. );
  2235. CacheModule.runJob("PUB", {
  2236. channel: "station.themeUpdate",
  2237. value: { stationId }
  2238. });
  2239. CacheModule.runJob("PUB", {
  2240. channel: "station.updated",
  2241. value: { stationId }
  2242. });
  2243. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2244. userId: session.userId,
  2245. type: "station__edit_theme",
  2246. payload: {
  2247. message: `Changed theme of station <stationId>${station.displayName}</stationId> to ${newTheme}`,
  2248. stationId
  2249. }
  2250. });
  2251. return cb({
  2252. status: "success",
  2253. message: "Successfully updated the theme."
  2254. });
  2255. }
  2256. );
  2257. }),
  2258. /**
  2259. * Pauses a station
  2260. *
  2261. * @param session
  2262. * @param stationId - the station id
  2263. * @param cb
  2264. */
  2265. pause: isOwnerRequired(async function pause(session, stationId, cb) {
  2266. const stationModel = await DBModule.runJob(
  2267. "GET_MODEL",
  2268. {
  2269. modelName: "station"
  2270. },
  2271. this
  2272. );
  2273. async.waterfall(
  2274. [
  2275. next => {
  2276. StationsModule.runJob("GET_STATION", { stationId }, this)
  2277. .then(station => {
  2278. next(null, station);
  2279. })
  2280. .catch(next);
  2281. },
  2282. (station, next) => {
  2283. if (!station) return next("Station not found.");
  2284. if (station.paused) return next("That station was already paused.");
  2285. return stationModel.updateOne(
  2286. { _id: stationId },
  2287. { $set: { paused: true, pausedAt: Date.now() } },
  2288. next
  2289. );
  2290. },
  2291. (res, next) => {
  2292. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2293. .then(station => {
  2294. next(null, station);
  2295. })
  2296. .catch(next);
  2297. }
  2298. ],
  2299. async err => {
  2300. if (err) {
  2301. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2302. this.log("ERROR", "STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  2303. return cb({ status: "error", message: err });
  2304. }
  2305. this.log("SUCCESS", "STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  2306. CacheModule.runJob("PUB", {
  2307. channel: "station.pause",
  2308. value: stationId
  2309. });
  2310. NotificationsModule.runJob("UNSCHEDULE", {
  2311. name: `stations.nextSong?id=${stationId}`
  2312. });
  2313. return cb({
  2314. status: "success",
  2315. message: "Successfully paused."
  2316. });
  2317. }
  2318. );
  2319. }),
  2320. /**
  2321. * Resumes a station
  2322. *
  2323. * @param session
  2324. * @param stationId - the station id
  2325. * @param cb
  2326. */
  2327. resume: isOwnerRequired(async function resume(session, stationId, cb) {
  2328. const stationModel = await DBModule.runJob(
  2329. "GET_MODEL",
  2330. {
  2331. modelName: "station"
  2332. },
  2333. this
  2334. );
  2335. async.waterfall(
  2336. [
  2337. next => {
  2338. StationsModule.runJob("GET_STATION", { stationId }, this)
  2339. .then(station => {
  2340. next(null, station);
  2341. })
  2342. .catch(next);
  2343. },
  2344. (station, next) => {
  2345. if (!station) return next("Station not found.");
  2346. if (!station.paused) return next("That station is not paused.");
  2347. station.timePaused += Date.now() - station.pausedAt;
  2348. return stationModel.updateOne(
  2349. { _id: stationId },
  2350. {
  2351. $set: { paused: false },
  2352. $inc: { timePaused: Date.now() - station.pausedAt }
  2353. },
  2354. next
  2355. );
  2356. },
  2357. (res, next) => {
  2358. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2359. .then(station => {
  2360. next(null, station);
  2361. })
  2362. .catch(next);
  2363. }
  2364. ],
  2365. async err => {
  2366. if (err) {
  2367. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2368. this.log("ERROR", "STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  2369. return cb({ status: "error", message: err });
  2370. }
  2371. this.log("SUCCESS", "STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  2372. CacheModule.runJob("PUB", {
  2373. channel: "station.resume",
  2374. value: stationId
  2375. });
  2376. return cb({
  2377. status: "success",
  2378. message: "Successfully resumed."
  2379. });
  2380. }
  2381. );
  2382. }),
  2383. /**
  2384. * Removes a station
  2385. *
  2386. * @param session
  2387. * @param stationId - the station id
  2388. * @param cb
  2389. */
  2390. remove: isOwnerRequired(async function remove(session, stationId, cb) {
  2391. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  2392. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2393. async.waterfall(
  2394. [
  2395. next => {
  2396. stationModel.findById(stationId, (err, station) => {
  2397. if (err) return next(err);
  2398. return next(null, station);
  2399. });
  2400. },
  2401. (station, next) => {
  2402. stationModel.deleteOne({ _id: stationId }, err => next(err, station));
  2403. },
  2404. (station, next) => {
  2405. CacheModule.runJob("HDEL", { table: "stations", key: stationId }, this)
  2406. .then(() => next(null, station))
  2407. .catch(next);
  2408. },
  2409. // remove the playlist for the station
  2410. (station, next) => {
  2411. if (station.playlist)
  2412. PlaylistsModule.runJob("DELETE_PLAYLIST", { playlistId: station.playlist })
  2413. .then(() => {})
  2414. .catch(next);
  2415. next(null, station);
  2416. },
  2417. // remove reference to the station id in any array of a user's favorite stations
  2418. (station, next) => {
  2419. userModel.updateMany(
  2420. { favoriteStations: stationId },
  2421. { $pull: { favoriteStations: stationId } },
  2422. err => next(err, station)
  2423. );
  2424. }
  2425. ],
  2426. async (err, station) => {
  2427. if (err) {
  2428. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2429. this.log("ERROR", "STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  2430. return cb({ status: "error", message: err });
  2431. }
  2432. this.log("SUCCESS", "STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  2433. CacheModule.runJob("PUB", {
  2434. channel: "station.remove",
  2435. value: stationId
  2436. });
  2437. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2438. userId: session.userId,
  2439. type: "station__remove",
  2440. payload: { message: `Removed a station named ${station.displayName}` }
  2441. });
  2442. ActivitiesModule.runJob("REMOVE_ACTIVITY_REFERENCES", { type: "stationId", stationId });
  2443. return cb({
  2444. status: "success",
  2445. message: "Successfully removed."
  2446. });
  2447. }
  2448. );
  2449. }),
  2450. /**
  2451. * Create a station
  2452. *
  2453. * @param session
  2454. * @param data - the station data
  2455. * @param cb
  2456. */
  2457. create: isLoginRequired(async function create(session, data, cb) {
  2458. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2459. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  2460. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  2461. data.name = data.name.toLowerCase();
  2462. let blacklist = [
  2463. "about",
  2464. "support",
  2465. "staff",
  2466. "help",
  2467. "news",
  2468. "terms",
  2469. "privacy",
  2470. "profile",
  2471. "c",
  2472. "community",
  2473. "tos",
  2474. "login",
  2475. "register",
  2476. "p",
  2477. "official",
  2478. "o",
  2479. "faq",
  2480. "team",
  2481. "donate",
  2482. "buy",
  2483. "shop",
  2484. "forums",
  2485. "explore",
  2486. "settings",
  2487. "admin",
  2488. "auth",
  2489. "reset_password",
  2490. "backend",
  2491. "api",
  2492. "songs",
  2493. "playlists",
  2494. "playlist",
  2495. "albums",
  2496. "artists",
  2497. "artist",
  2498. "station"
  2499. ];
  2500. if (data.type === "community" && config.has("blacklistedCommunityStationNames"))
  2501. blacklist = [...blacklist, ...config.get("blacklistedCommunityStationNames")];
  2502. async.waterfall(
  2503. [
  2504. next => {
  2505. if (!data) return next("Invalid data.");
  2506. return next();
  2507. },
  2508. next => {
  2509. stationModel.findOne(
  2510. {
  2511. $or: [{ name: data.name }, { displayName: new RegExp(`^${data.displayName}$`, "i") }]
  2512. },
  2513. next
  2514. );
  2515. },
  2516. (station, next) => {
  2517. this.log(station);
  2518. if (station) return next("A station with that name or display name already exists.");
  2519. if (blacklist.indexOf(data.name) !== -1)
  2520. return next("That name is blacklisted. Please use a different name.");
  2521. if (data.type === "official") {
  2522. return userModel.findOne({ _id: session.userId }, (err, user) => {
  2523. if (err) return next(err);
  2524. if (!user) return next("User not found.");
  2525. if (user.role !== "admin") return next("Admin required.");
  2526. return next();
  2527. });
  2528. }
  2529. return next();
  2530. },
  2531. next => {
  2532. const stationId = mongoose.Types.ObjectId();
  2533. playlistModel.create(
  2534. {
  2535. displayName: `Station - ${data.name}`,
  2536. songs: [],
  2537. createdBy: data.type === "official" ? "Musare" : session.userId,
  2538. createdFor: `${stationId}`,
  2539. createdAt: Date.now(),
  2540. type: "station"
  2541. },
  2542. (err, playlist) => {
  2543. next(err, playlist, stationId);
  2544. }
  2545. );
  2546. },
  2547. (playlist, stationId, next) => {
  2548. const { name, displayName, description, type } = data;
  2549. if (type === "official") {
  2550. stationModel.create(
  2551. {
  2552. _id: stationId,
  2553. name,
  2554. displayName,
  2555. description,
  2556. playlist: playlist._id,
  2557. type,
  2558. privacy: "private",
  2559. queue: [],
  2560. currentSong: null,
  2561. partyMode: false,
  2562. playMode: "random"
  2563. },
  2564. next
  2565. );
  2566. } else {
  2567. stationModel.create(
  2568. {
  2569. _id: stationId,
  2570. name,
  2571. displayName,
  2572. description,
  2573. playlist: playlist._id,
  2574. type,
  2575. privacy: "private",
  2576. owner: session.userId,
  2577. queue: [],
  2578. currentSong: null,
  2579. partyMode: true,
  2580. playMode: "random"
  2581. },
  2582. next
  2583. );
  2584. }
  2585. }
  2586. ],
  2587. async (err, station) => {
  2588. if (err) {
  2589. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2590. this.log("ERROR", "STATIONS_CREATE", `Creating station failed. "${err}"`);
  2591. cb({ status: "error", message: err });
  2592. } else {
  2593. this.log("SUCCESS", "STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  2594. CacheModule.runJob("PUB", {
  2595. channel: "station.create",
  2596. value: station._id
  2597. });
  2598. ActivitiesModule.runJob("ADD_ACTIVITY", {
  2599. userId: session.userId,
  2600. type: "station__create",
  2601. payload: {
  2602. message: `Created a station named <stationId>${station.displayName}</stationId>`,
  2603. stationId: station._id
  2604. }
  2605. });
  2606. cb({
  2607. status: "success",
  2608. message: "Successfully created station."
  2609. });
  2610. }
  2611. }
  2612. );
  2613. }),
  2614. /**
  2615. * Adds song to station queue
  2616. *
  2617. * @param session
  2618. * @param stationId - the station id
  2619. * @param youtubeId - the song id
  2620. * @param cb
  2621. */
  2622. addToQueue: isLoginRequired(async function addToQueue(session, stationId, youtubeId, cb) {
  2623. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  2624. const stationModel = await DBModule.runJob(
  2625. "GET_MODEL",
  2626. {
  2627. modelName: "station"
  2628. },
  2629. this
  2630. );
  2631. async.waterfall(
  2632. [
  2633. next => {
  2634. StationsModule.runJob("GET_STATION", { stationId }, this)
  2635. .then(station => {
  2636. next(null, station);
  2637. })
  2638. .catch(next);
  2639. },
  2640. (station, next) => {
  2641. if (!station) return next("Station not found.");
  2642. if (!station.partyMode) return next("Station is not in party mode.");
  2643. if (station.locked) {
  2644. return userModel.findOne({ _id: session.userId }, (err, user) => {
  2645. if (user.role !== "admin" && station.owner !== session.userId)
  2646. return next("Only owners and admins can add songs to a locked queue.");
  2647. return next(null, station);
  2648. });
  2649. }
  2650. return next(null, station);
  2651. },
  2652. (station, next) => {
  2653. if (station.type !== "community") return next("That station is not a community station.");
  2654. return StationsModule.runJob(
  2655. "CAN_USER_VIEW_STATION",
  2656. {
  2657. station,
  2658. userId: session.userId
  2659. },
  2660. this
  2661. )
  2662. .then(canView => {
  2663. if (canView) return next(null, station);
  2664. return next("Insufficient permissions.");
  2665. })
  2666. .catch(err => next(err));
  2667. },
  2668. (station, next) => {
  2669. if (station.currentSong && station.currentSong.youtubeId === youtubeId)
  2670. return next("That song is currently playing.");
  2671. return async.each(
  2672. station.queue,
  2673. (queueSong, next) => {
  2674. if (queueSong.youtubeId === youtubeId) return next("That song is already in the queue.");
  2675. return next();
  2676. },
  2677. err => next(err, station)
  2678. );
  2679. },
  2680. (station, next) => {
  2681. DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
  2682. .then(UserModel => {
  2683. UserModel.findOne(
  2684. { _id: session.userId },
  2685. { "preferences.anonymousSongRequests": 1 },
  2686. (err, user) => next(err, station, user)
  2687. );
  2688. })
  2689. .catch(next);
  2690. },
  2691. (station, user, next) => {
  2692. SongsModule.runJob(
  2693. "ENSURE_SONG_EXISTS_BY_YOUTUBE_ID",
  2694. {
  2695. youtubeId,
  2696. userId: user.preferences.anonymousSongRequests ? null : session.userId,
  2697. automaticallyRequested: true
  2698. },
  2699. this
  2700. )
  2701. .then(response => {
  2702. const { song } = response;
  2703. const { _id, title, skipDuration, artists, thumbnail, duration, verified } = song;
  2704. next(
  2705. null,
  2706. {
  2707. _id,
  2708. youtubeId,
  2709. title,
  2710. skipDuration,
  2711. artists,
  2712. thumbnail,
  2713. duration,
  2714. verified
  2715. },
  2716. station
  2717. );
  2718. })
  2719. .catch(next);
  2720. },
  2721. (song, station, next) => {
  2722. const excludedPlaylists = [];
  2723. async.eachLimit(
  2724. station.excludedPlaylists,
  2725. 1,
  2726. (playlistId, next) => {
  2727. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
  2728. .then(playlist => {
  2729. excludedPlaylists.push(playlist);
  2730. next();
  2731. })
  2732. .catch(next);
  2733. },
  2734. err => {
  2735. next(err, song, station, excludedPlaylists);
  2736. }
  2737. );
  2738. },
  2739. (song, station, excludedPlaylists, next) => {
  2740. const excludedSongs = excludedPlaylists
  2741. .flatMap(excludedPlaylist => excludedPlaylist.songs)
  2742. .reduce(
  2743. (items, item) =>
  2744. items.find(x => x.youtubeId === item.youtubeId) ? [...items] : [...items, item],
  2745. []
  2746. );
  2747. if (excludedSongs.find(excludedSong => excludedSong._id.toString() === song._id.toString()))
  2748. next("That song is in an excluded playlist and cannot be played.");
  2749. else next(null, song, station);
  2750. },
  2751. (song, station, next) => {
  2752. song.requestedBy = session.userId;
  2753. song.requestedAt = Date.now();
  2754. return next(null, song);
  2755. },
  2756. // (song, station, next) => {
  2757. // song.requestedBy = session.userId;
  2758. // song.requestedAt = Date.now();
  2759. // let totalDuration = 0;
  2760. // station.queue.forEach(song => {
  2761. // totalDuration += song.duration;
  2762. // });
  2763. // if (totalDuration >= 3600 * 3) return next("The max length of the queue is 3 hours.");
  2764. // return next(null, song, station);
  2765. // },
  2766. // (song, station, next) => {
  2767. // if (station.queue.length === 0) return next(null, song, station);
  2768. // let totalDuration = 0;
  2769. // const userId = station.queue[station.queue.length - 1].requestedBy;
  2770. // station.queue.forEach(song => {
  2771. // if (userId === song.requestedBy) {
  2772. // totalDuration += song.duration;
  2773. // }
  2774. // });
  2775. // if (totalDuration >= 900) return next("The max length of songs per user is 15 minutes.");
  2776. // return next(null, song, station);
  2777. // },
  2778. // (song, station, next) => {
  2779. // if (station.queue.length === 0) return next(null, song);
  2780. // let totalSongs = 0;
  2781. // const userId = station.queue[station.queue.length - 1].requestedBy;
  2782. // station.queue.forEach(song => {
  2783. // if (userId === song.requestedBy) {
  2784. // totalSongs += 1;
  2785. // }
  2786. // });
  2787. // if (totalSongs <= 2) return next(null, song);
  2788. // if (totalSongs > 3)
  2789. // return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  2790. // if (
  2791. // station.queue[station.queue.length - 2].requestedBy !== userId ||
  2792. // station.queue[station.queue.length - 3] !== userId
  2793. // )
  2794. // return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  2795. // return next(null, song);
  2796. // },
  2797. (song, next) => {
  2798. stationModel.updateOne(
  2799. { _id: stationId },
  2800. { $push: { queue: song } },
  2801. { runValidators: true },
  2802. next
  2803. );
  2804. },
  2805. (res, next) => {
  2806. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2807. .then(station => next(null, station))
  2808. .catch(next);
  2809. }
  2810. ],
  2811. async err => {
  2812. if (err) {
  2813. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2814. this.log(
  2815. "ERROR",
  2816. "STATIONS_ADD_SONG_TO_QUEUE",
  2817. `Adding song "${youtubeId}" to station "${stationId}" queue failed. "${err}"`
  2818. );
  2819. return cb({ status: "error", message: err });
  2820. }
  2821. this.log(
  2822. "SUCCESS",
  2823. "STATIONS_ADD_SONG_TO_QUEUE",
  2824. `Added song "${youtubeId}" to station "${stationId}" successfully.`
  2825. );
  2826. CacheModule.runJob("PUB", {
  2827. channel: "station.queueUpdate",
  2828. value: stationId
  2829. });
  2830. return cb({
  2831. status: "success",
  2832. message: "Successfully added song to queue."
  2833. });
  2834. }
  2835. );
  2836. }),
  2837. /**
  2838. * Removes song from station queue
  2839. *
  2840. * @param session
  2841. * @param stationId - the station id
  2842. * @param youtubeId - the youtube id
  2843. * @param cb
  2844. */
  2845. removeFromQueue: isOwnerRequired(async function removeFromQueue(session, stationId, youtubeId, cb) {
  2846. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  2847. async.waterfall(
  2848. [
  2849. next => {
  2850. if (!youtubeId) return next("Invalid youtube id.");
  2851. return StationsModule.runJob("GET_STATION", { stationId }, this)
  2852. .then(station => next(null, station))
  2853. .catch(next);
  2854. },
  2855. (station, next) => {
  2856. if (!station) return next("Station not found.");
  2857. return async.each(
  2858. station.queue,
  2859. (queueSong, next) => {
  2860. if (queueSong.youtubeId === youtubeId) return next(true);
  2861. return next();
  2862. },
  2863. err => {
  2864. if (err === true) return next();
  2865. return next("Song is not currently in the queue.");
  2866. }
  2867. );
  2868. },
  2869. next => {
  2870. stationModel.updateOne({ _id: stationId }, { $pull: { queue: { youtubeId } } }, next);
  2871. },
  2872. (res, next) => {
  2873. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2874. .then(station => next(null, station))
  2875. .catch(next);
  2876. }
  2877. ],
  2878. async err => {
  2879. if (err) {
  2880. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2881. this.log(
  2882. "ERROR",
  2883. "STATIONS_REMOVE_SONG_TO_QUEUE",
  2884. `Removing song "${youtubeId}" from station "${stationId}" queue failed. "${err}"`
  2885. );
  2886. return cb({ status: "error", message: err });
  2887. }
  2888. this.log(
  2889. "SUCCESS",
  2890. "STATIONS_REMOVE_SONG_TO_QUEUE",
  2891. `Removed song "${youtubeId}" from station "${stationId}" successfully.`
  2892. );
  2893. CacheModule.runJob("PUB", {
  2894. channel: "station.queueUpdate",
  2895. value: stationId
  2896. });
  2897. return cb({
  2898. status: "success",
  2899. message: "Successfully removed song from queue."
  2900. });
  2901. }
  2902. );
  2903. }),
  2904. /**
  2905. * Gets the queue from a station
  2906. *
  2907. * @param {object} session - user session
  2908. * @param {string} stationId - the station id
  2909. * @param {Function} cb - callback
  2910. */
  2911. getQueue(session, stationId, cb) {
  2912. async.waterfall(
  2913. [
  2914. next => {
  2915. StationsModule.runJob("GET_STATION", { stationId }, this)
  2916. .then(station => next(null, station))
  2917. .catch(next);
  2918. },
  2919. (station, next) => {
  2920. if (!station) return next("Station not found.");
  2921. return next(null, station);
  2922. },
  2923. (station, next) => {
  2924. StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  2925. .then(canView => {
  2926. if (canView) return next(null, station);
  2927. return next("Insufficient permissions.");
  2928. })
  2929. .catch(err => next(err));
  2930. },
  2931. (station, next) => next(null, station.queue)
  2932. ],
  2933. async (err, queue) => {
  2934. if (err) {
  2935. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2936. this.log(
  2937. "ERROR",
  2938. "STATIONS_GET_QUEUE",
  2939. `Getting queue for station "${stationId}" failed. "${err}"`
  2940. );
  2941. return cb({ status: "error", message: err });
  2942. }
  2943. this.log("SUCCESS", "STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  2944. return cb({
  2945. status: "success",
  2946. message: "Successfully got queue.",
  2947. data: { queue }
  2948. });
  2949. }
  2950. );
  2951. },
  2952. /**
  2953. * Reposition a song in station queue
  2954. *
  2955. * @param {object} session - user session
  2956. * @param {object} song - contains details about the song that is to be repositioned
  2957. * @param {string} song.youtubeId - the youtube id of the song
  2958. * @param {number} song.newIndex - the new position for the song in the queue
  2959. * @param {number} song.oldIndex - the old position of the song in the queue
  2960. * @param {string} stationId - the station id
  2961. * @param {Function} cb - callback
  2962. */
  2963. repositionSongInQueue: isOwnerRequired(async function repositionQueue(session, stationId, song, cb) {
  2964. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  2965. async.waterfall(
  2966. [
  2967. next => {
  2968. if (!song || !song.youtubeId) return next("You must provide a song to reposition.");
  2969. return next();
  2970. },
  2971. // remove song from queue
  2972. next => {
  2973. stationModel.updateOne(
  2974. { _id: stationId },
  2975. { $pull: { queue: { youtubeId: song.youtubeId } } },
  2976. next
  2977. );
  2978. },
  2979. // add song back to queue (in new position)
  2980. (res, next) => {
  2981. stationModel.updateOne(
  2982. { _id: stationId },
  2983. { $push: { queue: { $each: [song], $position: song.newIndex } } },
  2984. err => next(err)
  2985. );
  2986. },
  2987. // update the cache representation of the station
  2988. next => {
  2989. StationsModule.runJob("UPDATE_STATION", { stationId }, this)
  2990. .then(station => next(null, station))
  2991. .catch(next);
  2992. }
  2993. ],
  2994. async err => {
  2995. if (err) {
  2996. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  2997. this.log(
  2998. "ERROR",
  2999. "STATIONS_REPOSITION_SONG_IN_QUEUE",
  3000. `Repositioning song ${song.youtubeId} in queue of station "${stationId}" failed. "${err}"`
  3001. );
  3002. return cb({ status: "error", message: err });
  3003. }
  3004. this.log(
  3005. "SUCCESS",
  3006. "STATIONS_REPOSITION_SONG_IN_QUEUE",
  3007. `Repositioned song ${song.youtubeId} in queue of station "${stationId}" successfully.`
  3008. );
  3009. CacheModule.runJob("PUB", {
  3010. channel: "station.repositionSongInQueue",
  3011. value: {
  3012. song: {
  3013. youtubeId: song.youtubeId,
  3014. oldIndex: song.oldIndex,
  3015. newIndex: song.newIndex
  3016. },
  3017. stationId
  3018. }
  3019. });
  3020. return cb({
  3021. status: "success",
  3022. message: "Successfully repositioned song in queue."
  3023. });
  3024. }
  3025. );
  3026. }),
  3027. /**
  3028. * Includes a playlist in a station
  3029. *
  3030. * @param session
  3031. * @param stationId - the station id
  3032. * @param playlistId - the playlist id
  3033. * @param cb
  3034. */
  3035. includePlaylist: isOwnerRequired(async function includePlaylist(session, stationId, playlistId, cb) {
  3036. async.waterfall(
  3037. [
  3038. next => {
  3039. StationsModule.runJob("GET_STATION", { stationId }, this)
  3040. .then(station => next(null, station))
  3041. .catch(next);
  3042. },
  3043. (station, next) => {
  3044. if (!station) return next("Station not found.");
  3045. if (station.includedPlaylists.indexOf(playlistId) !== -1)
  3046. return next("That playlist is already included.");
  3047. if (station.playMode === "sequential" && station.includedPlaylists.length > 0)
  3048. return next("Error: Only 1 playlist can be included in sequential play mode.");
  3049. return next();
  3050. },
  3051. next => {
  3052. StationsModule.runJob("INCLUDE_PLAYLIST", { stationId, playlistId }, this)
  3053. .then(() => {
  3054. next();
  3055. })
  3056. .catch(next);
  3057. }
  3058. ],
  3059. async err => {
  3060. if (err) {
  3061. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3062. this.log(
  3063. "ERROR",
  3064. "STATIONS_INCLUDE_PLAYLIST",
  3065. `Including playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  3066. );
  3067. return cb({ status: "error", message: err });
  3068. }
  3069. this.log(
  3070. "SUCCESS",
  3071. "STATIONS_INCLUDE_PLAYLIST",
  3072. `Including playlist "${playlistId}" for station "${stationId}" successfully.`
  3073. );
  3074. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  3075. CacheModule.runJob("PUB", {
  3076. channel: "station.includedPlaylist",
  3077. value: {
  3078. playlistId,
  3079. stationId
  3080. }
  3081. });
  3082. return cb({
  3083. status: "success",
  3084. message: "Successfully included playlist."
  3085. });
  3086. }
  3087. );
  3088. }),
  3089. /**
  3090. * Remove included a playlist from a station
  3091. *
  3092. * @param session
  3093. * @param stationId - the station id
  3094. * @param playlistId - the playlist id
  3095. * @param cb
  3096. */
  3097. removeIncludedPlaylist: isOwnerRequired(async function removeIncludedPlaylist(session, stationId, playlistId, cb) {
  3098. async.waterfall(
  3099. [
  3100. next => {
  3101. StationsModule.runJob("GET_STATION", { stationId }, this)
  3102. .then(station => next(null, station))
  3103. .catch(next);
  3104. },
  3105. (station, next) => {
  3106. if (!station) return next("Station not found.");
  3107. if (station.includedPlaylists.indexOf(playlistId) === -1)
  3108. return next("That playlist is not included.");
  3109. return next();
  3110. },
  3111. next => {
  3112. StationsModule.runJob("REMOVE_INCLUDED_PLAYLIST", { stationId, playlistId }, this)
  3113. .then(() => {
  3114. next();
  3115. })
  3116. .catch(next);
  3117. }
  3118. ],
  3119. async err => {
  3120. if (err) {
  3121. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3122. this.log(
  3123. "ERROR",
  3124. "STATIONS_REMOVE_INCLUDED_PLAYLIST",
  3125. `Removing included playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  3126. );
  3127. return cb({ status: "error", message: err });
  3128. }
  3129. this.log(
  3130. "SUCCESS",
  3131. "STATIONS_REMOVE_INCLUDED_PLAYLIST",
  3132. `Removing included playlist "${playlistId}" for station "${stationId}" successfully.`
  3133. );
  3134. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  3135. CacheModule.runJob("PUB", {
  3136. channel: "station.removedIncludedPlaylist",
  3137. value: {
  3138. playlistId,
  3139. stationId
  3140. }
  3141. });
  3142. return cb({
  3143. status: "success",
  3144. message: "Successfully removed included playlist."
  3145. });
  3146. }
  3147. );
  3148. }),
  3149. /**
  3150. * Excludes a playlist in a station
  3151. *
  3152. * @param session
  3153. * @param stationId - the station id
  3154. * @param playlistId - the playlist id
  3155. * @param cb
  3156. */
  3157. excludePlaylist: isOwnerRequired(async function excludePlaylist(session, stationId, playlistId, cb) {
  3158. async.waterfall(
  3159. [
  3160. next => {
  3161. StationsModule.runJob("GET_STATION", { stationId }, this)
  3162. .then(station => next(null, station))
  3163. .catch(next);
  3164. },
  3165. (station, next) => {
  3166. if (!station) return next("Station not found.");
  3167. if (station.excludedPlaylists.indexOf(playlistId) !== -1)
  3168. return next("That playlist is already excluded.");
  3169. return next();
  3170. },
  3171. next => {
  3172. StationsModule.runJob("EXCLUDE_PLAYLIST", { stationId, playlistId }, this)
  3173. .then(() => {
  3174. next();
  3175. })
  3176. .catch(next);
  3177. }
  3178. ],
  3179. async err => {
  3180. if (err) {
  3181. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3182. this.log(
  3183. "ERROR",
  3184. "STATIONS_EXCLUDE_PLAYLIST",
  3185. `Excluding playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  3186. );
  3187. return cb({ status: "error", message: err });
  3188. }
  3189. this.log(
  3190. "SUCCESS",
  3191. "STATIONS_EXCLUDE_PLAYLIST",
  3192. `Excluding playlist "${playlistId}" for station "${stationId}" successfully.`
  3193. );
  3194. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  3195. CacheModule.runJob("PUB", {
  3196. channel: "station.excludedPlaylist",
  3197. value: {
  3198. playlistId,
  3199. stationId
  3200. }
  3201. });
  3202. return cb({
  3203. status: "success",
  3204. message: "Successfully excluded playlist."
  3205. });
  3206. }
  3207. );
  3208. }),
  3209. /**
  3210. * Remove excluded a playlist from a station
  3211. *
  3212. * @param session
  3213. * @param stationId - the station id
  3214. * @param playlistId - the playlist id
  3215. * @param cb
  3216. */
  3217. removeExcludedPlaylist: isOwnerRequired(async function removeExcludedPlaylist(session, stationId, playlistId, cb) {
  3218. async.waterfall(
  3219. [
  3220. next => {
  3221. StationsModule.runJob("GET_STATION", { stationId }, this)
  3222. .then(station => next(null, station))
  3223. .catch(next);
  3224. },
  3225. (station, next) => {
  3226. if (!station) return next("Station not found.");
  3227. if (station.excludedPlaylists.indexOf(playlistId) === -1)
  3228. return next("That playlist is not excluded.");
  3229. return next();
  3230. },
  3231. next => {
  3232. StationsModule.runJob("REMOVE_EXCLUDED_PLAYLIST", { stationId, playlistId }, this)
  3233. .then(() => {
  3234. next();
  3235. })
  3236. .catch(next);
  3237. }
  3238. ],
  3239. async err => {
  3240. if (err) {
  3241. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3242. this.log(
  3243. "ERROR",
  3244. "STATIONS_REMOVE_EXCLUDED_PLAYLIST",
  3245. `Removing excluded playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  3246. );
  3247. return cb({ status: "error", message: err });
  3248. }
  3249. this.log(
  3250. "SUCCESS",
  3251. "STATIONS_REMOVE_EXCLUDED_PLAYLIST",
  3252. `Removing excluded playlist "${playlistId}" for station "${stationId}" successfully.`
  3253. );
  3254. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  3255. CacheModule.runJob("PUB", {
  3256. channel: "station.removedExcludedPlaylist",
  3257. value: {
  3258. playlistId,
  3259. stationId
  3260. }
  3261. });
  3262. return cb({
  3263. status: "success",
  3264. message: "Successfully removed excluded playlist."
  3265. });
  3266. }
  3267. );
  3268. }),
  3269. /**
  3270. * Selects a private playlist for a station
  3271. *
  3272. * @param session
  3273. * @param stationId - the station id
  3274. * @param playlistId - the private playlist id
  3275. * @param cb
  3276. */
  3277. selectPrivatePlaylist: isOwnerRequired(async function selectPrivatePlaylist(session, stationId, playlistId, cb) {
  3278. async.waterfall(
  3279. [
  3280. next => {
  3281. StationsModule.runJob("GET_STATION", { stationId }, this)
  3282. .then(station => next(null, station))
  3283. .catch(next);
  3284. },
  3285. (station, next) => {
  3286. if (!station) return next("Station not found.");
  3287. if (station.type !== "community") return next("Station is not a community station.");
  3288. if (station.includedPlaylists.indexOf(playlistId) !== -1)
  3289. return next("That playlist is already included.");
  3290. return next();
  3291. },
  3292. next => {
  3293. StationsModule.runJob("INCLUDE_PLAYLIST", { stationId, playlistId }, this)
  3294. .then(() => {
  3295. next();
  3296. })
  3297. .catch(next);
  3298. }
  3299. ],
  3300. async err => {
  3301. if (err) {
  3302. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3303. this.log(
  3304. "ERROR",
  3305. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  3306. `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  3307. );
  3308. return cb({ status: "error", message: err });
  3309. }
  3310. this.log(
  3311. "SUCCESS",
  3312. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  3313. `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`
  3314. );
  3315. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  3316. return cb({
  3317. status: "success",
  3318. message: "Successfully selected playlist."
  3319. });
  3320. }
  3321. );
  3322. }),
  3323. /**
  3324. * Deselects the private playlist selected in a station
  3325. *
  3326. * @param session
  3327. * @param stationId - the station id
  3328. * @param cb
  3329. */
  3330. deselectPrivatePlaylist: isOwnerRequired(async function deselectPrivatePlaylist(
  3331. session,
  3332. stationId,
  3333. playlistId,
  3334. cb
  3335. ) {
  3336. async.waterfall(
  3337. [
  3338. next => {
  3339. StationsModule.runJob("GET_STATION", { stationId }, this)
  3340. .then(station => {
  3341. next(null, station);
  3342. })
  3343. .catch(next);
  3344. },
  3345. (station, next) => {
  3346. if (!station) return next("Station not found.");
  3347. if (station.type !== "community") return next("Station is not a community station.");
  3348. if (station.includedPlaylists.indexOf(playlistId) === -1)
  3349. return next("That playlist is not included.");
  3350. return next();
  3351. },
  3352. next => {
  3353. StationsModule.runJob("REMOVE_INCLUDED_PLAYLIST", { stationId, playlistId }, this)
  3354. .then(() => {
  3355. next();
  3356. })
  3357. .catch(next);
  3358. }
  3359. ],
  3360. async err => {
  3361. if (err) {
  3362. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3363. this.log(
  3364. "ERROR",
  3365. "STATIONS_DESELECT_PRIVATE_PLAYLIST",
  3366. `Deselecting private playlist for station "${stationId}" failed. "${err}"`
  3367. );
  3368. return cb({ status: "error", message: err });
  3369. }
  3370. this.log(
  3371. "SUCCESS",
  3372. "STATIONS_DESELECT_PRIVATE_PLAYLIST",
  3373. `Deselected private playlist for station "${stationId}" successfully.`
  3374. );
  3375. PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
  3376. return cb({
  3377. status: "success",
  3378. message: "Successfully deselected playlist."
  3379. });
  3380. }
  3381. );
  3382. }),
  3383. favoriteStation: isLoginRequired(async function favoriteStation(session, stationId, cb) {
  3384. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  3385. async.waterfall(
  3386. [
  3387. next => {
  3388. StationsModule.runJob("GET_STATION", { stationId }, this)
  3389. .then(station => next(null, station))
  3390. .catch(next);
  3391. },
  3392. (station, next) => {
  3393. if (!station) return next("Station not found.");
  3394. return StationsModule.runJob("CAN_USER_VIEW_STATION", { station, userId: session.userId }, this)
  3395. .then(canView => {
  3396. if (canView) return next(null, station);
  3397. return next("Insufficient permissions.");
  3398. })
  3399. .catch(err => next(err));
  3400. },
  3401. (station, next) => {
  3402. userModel.updateOne(
  3403. { _id: session.userId },
  3404. { $addToSet: { favoriteStations: stationId } },
  3405. (err, res) => next(err, station, res)
  3406. );
  3407. },
  3408. (station, res, next) => {
  3409. if (res.nModified === 0) return next("The station was already favorited.");
  3410. return next(null, station);
  3411. }
  3412. ],
  3413. async (err, station) => {
  3414. if (err) {
  3415. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3416. this.log("ERROR", "FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  3417. return cb({ status: "error", message: err });
  3418. }
  3419. this.log("SUCCESS", "FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  3420. CacheModule.runJob("PUB", {
  3421. channel: "user.favoritedStation",
  3422. value: {
  3423. userId: session.userId,
  3424. stationId
  3425. }
  3426. });
  3427. ActivitiesModule.runJob("ADD_ACTIVITY", {
  3428. userId: session.userId,
  3429. type: "station__favorite",
  3430. payload: {
  3431. message: `Favorited station <stationId>${station.displayName}</stationId>`,
  3432. stationId
  3433. }
  3434. });
  3435. return cb({
  3436. status: "success",
  3437. message: "Succesfully favorited station."
  3438. });
  3439. }
  3440. );
  3441. }),
  3442. unfavoriteStation: isLoginRequired(async function unfavoriteStation(session, stationId, cb) {
  3443. const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, this);
  3444. async.waterfall(
  3445. [
  3446. next => {
  3447. userModel.updateOne({ _id: session.userId }, { $pull: { favoriteStations: stationId } }, next);
  3448. },
  3449. (res, next) => {
  3450. if (res.nModified === 0) return next("The station wasn't favorited.");
  3451. return next();
  3452. },
  3453. next => {
  3454. StationsModule.runJob("GET_STATION", { stationId }, this)
  3455. .then(station => next(null, station))
  3456. .catch(next);
  3457. }
  3458. ],
  3459. async (err, station) => {
  3460. if (err) {
  3461. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3462. this.log("ERROR", "UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  3463. return cb({ status: "error", message: err });
  3464. }
  3465. this.log("SUCCESS", "UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  3466. CacheModule.runJob("PUB", {
  3467. channel: "user.unfavoritedStation",
  3468. value: {
  3469. userId: session.userId,
  3470. stationId
  3471. }
  3472. });
  3473. ActivitiesModule.runJob("ADD_ACTIVITY", {
  3474. userId: session.userId,
  3475. type: "station__unfavorite",
  3476. payload: {
  3477. message: `Unfavorited station <stationId>${station.displayName}</stationId>`,
  3478. stationId
  3479. }
  3480. });
  3481. return cb({
  3482. status: "success",
  3483. message: "Succesfully unfavorited station."
  3484. });
  3485. }
  3486. );
  3487. }),
  3488. /**
  3489. * Clears every station queue
  3490. *
  3491. * @param {object} session - the session object automatically added by socket.io
  3492. * @param {Function} cb - gets called with the result
  3493. */
  3494. clearEveryStationQueue: isAdminRequired(async function clearEveryStationQueue(session, cb) {
  3495. async.waterfall(
  3496. [
  3497. next => {
  3498. StationsModule.runJob("CLEAR_EVERY_STATION_QUEUE", {}, this)
  3499. .then(() => next())
  3500. .catch(next);
  3501. }
  3502. ],
  3503. async err => {
  3504. if (err) {
  3505. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3506. this.log("ERROR", "CLEAR_EVERY_STATION_QUEUE", `Clearing every station queue failed. "${err}"`);
  3507. return cb({ status: "error", message: err });
  3508. }
  3509. this.log("SUCCESS", "CLEAR_EVERY_STATION_QUEUE", "Clearing every station queue was successful.");
  3510. return cb({ status: "success", message: "Successfully cleared every station queue." });
  3511. }
  3512. );
  3513. }),
  3514. /**
  3515. * Clears and refills a station queue
  3516. *
  3517. * @param {object} session - the session object automatically added by socket.io
  3518. * @param {string} stationId - the station id
  3519. * @param {Function} cb - gets called with the result
  3520. */
  3521. clearAndRefillStationQueue: isAdminRequired(async function clearAndRefillStationQueue(session, stationId, cb) {
  3522. async.waterfall(
  3523. [
  3524. next => {
  3525. StationsModule.runJob("CLEAR_AND_REFILL_STATION_QUEUE", { stationId }, this)
  3526. .then(() => next())
  3527. .catch(next);
  3528. }
  3529. ],
  3530. async err => {
  3531. if (err) {
  3532. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3533. this.log(
  3534. "ERROR",
  3535. "CLEAR_AND_REFILL_STATION_QUEUE",
  3536. `Clearing and refilling station queue failed. "${err}"`
  3537. );
  3538. return cb({ status: "error", message: err });
  3539. }
  3540. this.log(
  3541. "SUCCESS",
  3542. "CLEAR_AND_REFILL_STATION_QUEUE",
  3543. "Clearing and refilling station queue was successful."
  3544. );
  3545. return cb({ status: "success", message: "Successfully cleared and refilled station queue." });
  3546. }
  3547. );
  3548. }),
  3549. /**
  3550. * Gets skip votes for a station
  3551. *
  3552. * @param session
  3553. * @param stationId - the station id
  3554. * @param stationId - the song id to get skipvotes for
  3555. * @param cb
  3556. */
  3557. getSkipVotes: isLoginRequired(async function getSkipVotes(session, stationId, songId, cb) {
  3558. async.waterfall(
  3559. [
  3560. next => {
  3561. StationsModule.runJob("GET_STATION", { stationId }, this)
  3562. .then(res => next(null, res.currentSong))
  3563. .catch(console.log);
  3564. },
  3565. (currentSong, next) => {
  3566. if (currentSong && currentSong._id === songId)
  3567. next(null, {
  3568. skipVotes: currentSong.skipVotes.length,
  3569. skipVotesCurrent: true
  3570. });
  3571. else
  3572. next(null, {
  3573. skipVotes: 0,
  3574. skipVotesCurrent: false
  3575. });
  3576. }
  3577. ],
  3578. async (err, data) => {
  3579. if (err) {
  3580. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  3581. this.log(
  3582. "ERROR",
  3583. "STATIONS_GET_SKIP_VOTES",
  3584. `User "${session.userId}" failed to get skip votes for ${stationId}. "${err}"`
  3585. );
  3586. return cb({ status: "error", message: err });
  3587. }
  3588. const { skipVotes, skipVotesCurrent } = data;
  3589. return cb({
  3590. status: "success",
  3591. data: {
  3592. skipVotes,
  3593. skipVotesCurrent
  3594. }
  3595. });
  3596. }
  3597. );
  3598. })
  3599. };