stations.js 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1
  35. };
  36. // TEMP
  37. CacheModule.runJob("SUB", {
  38. channel: "station.pause",
  39. cb: async stationId => {
  40. NotificationsModule.runJob("REMOVE", {
  41. subscription: `stations.nextSong?id=${stationId}`
  42. }).then();
  43. }
  44. });
  45. CacheModule.runJob("SUB", {
  46. channel: "station.resume",
  47. cb: async stationId => {
  48. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  49. }
  50. });
  51. CacheModule.runJob("SUB", {
  52. channel: "station.queueUpdate",
  53. cb: async stationId => {
  54. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  55. if (!station.currentSong && station.queue.length > 0) {
  56. StationsModule.runJob("INITIALIZE_STATION", {
  57. stationId
  58. }).then();
  59. }
  60. });
  61. }
  62. });
  63. CacheModule.runJob("SUB", {
  64. channel: "station.newOfficialPlaylist",
  65. cb: async stationId => {
  66. CacheModule.runJob("HGET", {
  67. table: "officialPlaylists",
  68. key: stationId
  69. }).then(playlistObj => {
  70. if (playlistObj) {
  71. IOModule.runJob("EMIT_TO_ROOM", {
  72. room: `station.${stationId}`,
  73. args: ["event:newOfficialPlaylist", playlistObj.songs]
  74. });
  75. }
  76. });
  77. }
  78. });
  79. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  80. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  81. return new Promise((resolve, reject) =>
  82. async.waterfall(
  83. [
  84. next => {
  85. this.setStage(2);
  86. CacheModule.runJob("HGETALL", { table: "stations" })
  87. .then(stations => {
  88. next(null, stations);
  89. })
  90. .catch(next);
  91. },
  92. (stations, next) => {
  93. this.setStage(3);
  94. if (!stations) return next();
  95. const stationIds = Object.keys(stations);
  96. return async.each(
  97. stationIds,
  98. (stationId, next) => {
  99. stationModel.findOne({ _id: stationId }, (err, station) => {
  100. if (err) next(err);
  101. else if (!station) {
  102. CacheModule.runJob("HDEL", {
  103. table: "stations",
  104. key: stationId
  105. })
  106. .then(() => {
  107. next();
  108. })
  109. .catch(next);
  110. } else next();
  111. });
  112. },
  113. next
  114. );
  115. },
  116. next => {
  117. this.setStage(4);
  118. stationModel.find({}, next);
  119. },
  120. (stations, next) => {
  121. this.setStage(5);
  122. async.each(
  123. stations,
  124. (station, next2) => {
  125. async.waterfall(
  126. [
  127. next => {
  128. CacheModule.runJob("HSET", {
  129. table: "stations",
  130. key: station._id,
  131. value: stationSchema(station)
  132. })
  133. .then(station => next(null, station))
  134. .catch(next);
  135. },
  136. (station, next) => {
  137. StationsModule.runJob(
  138. "INITIALIZE_STATION",
  139. {
  140. stationId: station._id
  141. },
  142. null,
  143. -1
  144. )
  145. .then(() => {
  146. next();
  147. })
  148. .catch(next);
  149. }
  150. ],
  151. err => {
  152. next2(err);
  153. }
  154. );
  155. },
  156. next
  157. );
  158. }
  159. ],
  160. async err => {
  161. if (err) {
  162. err = await UtilsModule.runJob("GET_ERROR", {
  163. error: err
  164. });
  165. reject(new Error(err));
  166. } else {
  167. resolve();
  168. }
  169. }
  170. )
  171. );
  172. }
  173. /**
  174. * Initialises a station
  175. *
  176. * @param {object} payload - object that contains the payload
  177. * @param {string} payload.stationId - id of the station to initialise
  178. * @returns {Promise} - returns a promise (resolve, reject)
  179. */
  180. INITIALIZE_STATION(payload) {
  181. return new Promise((resolve, reject) => {
  182. // if (typeof cb !== 'function') cb = ()=>{};
  183. async.waterfall(
  184. [
  185. next => {
  186. StationsModule.runJob(
  187. "GET_STATION",
  188. {
  189. stationId: payload.stationId
  190. },
  191. this
  192. )
  193. .then(station => {
  194. next(null, station);
  195. })
  196. .catch(next);
  197. },
  198. (station, next) => {
  199. if (!station) return next("Station not found.");
  200. return NotificationsModule.runJob(
  201. "UNSCHEDULE",
  202. {
  203. name: `stations.nextSong?id=${station._id}`
  204. },
  205. this
  206. )
  207. .then()
  208. .catch()
  209. .finally(() => {
  210. NotificationsModule.runJob(
  211. "SUBSCRIBE",
  212. {
  213. name: `stations.nextSong?id=${station._id}`,
  214. cb: () =>
  215. StationsModule.runJob("SKIP_STATION", {
  216. stationId: station._id
  217. }),
  218. unique: true,
  219. station
  220. },
  221. this
  222. )
  223. .then()
  224. .catch();
  225. if (station.paused) return next(true, station);
  226. return next(null, station);
  227. });
  228. },
  229. (station, next) => {
  230. if (!station.currentSong) {
  231. return StationsModule.runJob(
  232. "SKIP_STATION",
  233. {
  234. stationId: station._id
  235. },
  236. this
  237. )
  238. .then(station => {
  239. next(true, station);
  240. })
  241. .catch(next)
  242. .finally(() => {});
  243. }
  244. let timeLeft =
  245. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  246. if (Number.isNaN(timeLeft)) timeLeft = -1;
  247. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  248. return StationsModule.runJob(
  249. "SKIP_STATION",
  250. {
  251. stationId: station._id
  252. },
  253. this
  254. )
  255. .then(station => {
  256. next(null, station);
  257. })
  258. .catch(next);
  259. }
  260. // name, time, cb, station
  261. NotificationsModule.runJob("SCHEDULE", {
  262. name: `stations.nextSong?id=${station._id}`,
  263. time: timeLeft,
  264. station
  265. });
  266. return next(null, station);
  267. }
  268. ],
  269. async (err, station) => {
  270. if (err && err !== true) {
  271. err = await UtilsModule.runJob(
  272. "GET_ERROR",
  273. {
  274. error: err
  275. },
  276. this
  277. );
  278. reject(new Error(err));
  279. } else resolve(station);
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Calculates the next song for the station
  286. *
  287. * @param {object} payload - object that contains the payload
  288. * @param {string} payload.station - station object to calculate song for
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. async CALCULATE_SONG_FOR_STATION(payload) {
  292. // station, bypassValidate = false
  293. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  294. return new Promise((resolve, reject) => {
  295. const songList = [];
  296. return async.waterfall(
  297. [
  298. next => {
  299. if (payload.station.genres.length === 0) return next();
  300. const genresDone = [];
  301. return payload.station.genres.forEach(genre => {
  302. songModel.find({ genres: genre }, (err, songs) => {
  303. if (!err) {
  304. songs.forEach(song => {
  305. if (songList.indexOf(song._id) === -1) {
  306. let found = false;
  307. song.genres.forEach(songGenre => {
  308. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  309. found = true;
  310. });
  311. if (!found) {
  312. songList.push(song._id);
  313. }
  314. }
  315. });
  316. }
  317. genresDone.push(genre);
  318. if (genresDone.length === payload.station.genres.length) next();
  319. });
  320. });
  321. },
  322. next => {
  323. const playlist = [];
  324. songList.forEach(songId => {
  325. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  326. });
  327. // eslint-disable-next-line array-callback-return
  328. payload.station.playlist.filter(songId => {
  329. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  330. });
  331. UtilsModule.runJob("SHUFFLE", { array: playlist })
  332. .then(result => {
  333. next(null, result.array);
  334. }, this)
  335. .catch(next);
  336. },
  337. (playlist, next) => {
  338. StationsModule.runJob(
  339. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  340. {
  341. stationId: payload.station._id,
  342. songList: playlist
  343. },
  344. this
  345. )
  346. .then(() => {
  347. next(null, playlist);
  348. })
  349. .catch(next);
  350. },
  351. (playlist, next) => {
  352. StationsModule.stationModel.updateOne(
  353. { _id: payload.station._id },
  354. { $set: { playlist } },
  355. { runValidators: true },
  356. () => {
  357. StationsModule.runJob(
  358. "UPDATE_STATION",
  359. {
  360. stationId: payload.station._id
  361. },
  362. this
  363. )
  364. .then(() => {
  365. next(null, playlist);
  366. })
  367. .catch(next);
  368. }
  369. );
  370. }
  371. ],
  372. (err, newPlaylist) => {
  373. if (err) return reject(new Error(err));
  374. return resolve(newPlaylist);
  375. }
  376. );
  377. });
  378. }
  379. /**
  380. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  381. *
  382. * @param {object} payload - object that contains the payload
  383. * @param {string} payload.stationId - id of the station
  384. * @returns {Promise} - returns a promise (resolve, reject)
  385. */
  386. GET_STATION(payload) {
  387. return new Promise((resolve, reject) => {
  388. async.waterfall(
  389. [
  390. next => {
  391. CacheModule.runJob(
  392. "HGET",
  393. {
  394. table: "stations",
  395. key: payload.stationId
  396. },
  397. this
  398. )
  399. .then(station => {
  400. next(null, station);
  401. })
  402. .catch(next);
  403. },
  404. (station, next) => {
  405. if (station) return next(true, station);
  406. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  407. },
  408. (station, next) => {
  409. if (station) {
  410. if (station.type === "official") {
  411. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  412. stationId: station._id,
  413. songList: station.playlist
  414. })
  415. .then()
  416. .catch();
  417. }
  418. station = StationsModule.stationSchema(station);
  419. CacheModule.runJob("HSET", {
  420. table: "stations",
  421. key: payload.stationId,
  422. value: station
  423. })
  424. .then()
  425. .catch();
  426. next(true, station);
  427. } else next("Station not found");
  428. }
  429. ],
  430. async (err, station) => {
  431. if (err && err !== true) {
  432. err = await UtilsModule.runJob(
  433. "GET_ERROR",
  434. {
  435. error: err
  436. },
  437. this
  438. );
  439. reject(new Error(err));
  440. } else resolve(station);
  441. }
  442. );
  443. });
  444. }
  445. /**
  446. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  447. *
  448. * @param {object} payload - object that contains the payload
  449. * @param {string} payload.stationName - the unique name of the station
  450. * @returns {Promise} - returns a promise (resolve, reject)
  451. */
  452. async GET_STATION_BY_NAME(payload) {
  453. return new Promise((resolve, reject) =>
  454. async.waterfall(
  455. [
  456. next => {
  457. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  458. },
  459. (station, next) => {
  460. if (station) {
  461. if (station.type === "official") {
  462. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  463. stationId: station._id,
  464. songList: station.playlist
  465. });
  466. }
  467. station = StationsModule.stationSchema(station);
  468. CacheModule.runJob("HSET", {
  469. table: "stations",
  470. key: station._id,
  471. value: station
  472. });
  473. next(true, station);
  474. } else next("Station not found");
  475. }
  476. ],
  477. (err, station) => {
  478. if (err && err !== true) return reject(new Error(err));
  479. return resolve(station);
  480. }
  481. )
  482. );
  483. }
  484. /**
  485. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  486. *
  487. * @param {object} payload - object that contains the payload
  488. * @param {string} payload.stationId - the id of the station to update
  489. * @returns {Promise} - returns a promise (resolve, reject)
  490. */
  491. UPDATE_STATION(payload) {
  492. return new Promise((resolve, reject) => {
  493. async.waterfall(
  494. [
  495. next => {
  496. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  497. },
  498. (station, next) => {
  499. if (!station) {
  500. CacheModule.runJob("HDEL", {
  501. table: "stations",
  502. key: payload.stationId
  503. })
  504. .then()
  505. .catch();
  506. return next("Station not found");
  507. }
  508. return CacheModule.runJob(
  509. "HSET",
  510. {
  511. table: "stations",
  512. key: payload.stationId,
  513. value: station
  514. },
  515. this
  516. )
  517. .then(station => {
  518. next(null, station);
  519. })
  520. .catch(next);
  521. }
  522. ],
  523. async (err, station) => {
  524. if (err && err !== true) {
  525. err = await UtilsModule.runJob(
  526. "GET_ERROR",
  527. {
  528. error: err
  529. },
  530. this
  531. );
  532. reject(new Error(err));
  533. } else resolve(station);
  534. }
  535. );
  536. });
  537. }
  538. /**
  539. * Creates the official playlist for a station
  540. *
  541. * @param {object} payload - object that contains the payload
  542. * @param {string} payload.stationId - the id of the station
  543. * @param {Array} payload.songList - list of songs to put in official playlist
  544. * @returns {Promise} - returns a promise (resolve, reject)
  545. */
  546. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  547. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  548. console.log(typeof payload.songList, payload.songList);
  549. return new Promise(resolve => {
  550. const lessInfoPlaylist = [];
  551. return async.each(
  552. payload.songList,
  553. (song, next) => {
  554. SongsModule.runJob("GET_SONG", { id: song }, this)
  555. .then(response => {
  556. const { song } = response;
  557. if (song) {
  558. const newSong = {
  559. songId: song.songId,
  560. title: song.title,
  561. artists: song.artists,
  562. duration: song.duration
  563. };
  564. lessInfoPlaylist.push(newSong);
  565. }
  566. })
  567. .finally(() => {
  568. next();
  569. });
  570. },
  571. () => {
  572. CacheModule.runJob(
  573. "HSET",
  574. {
  575. table: "officialPlaylists",
  576. key: payload.stationId,
  577. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  578. },
  579. this
  580. ).finally(() => {
  581. CacheModule.runJob("PUB", {
  582. channel: "station.newOfficialPlaylist",
  583. value: payload.stationId
  584. });
  585. resolve();
  586. });
  587. }
  588. );
  589. });
  590. }
  591. /**
  592. * Skips a station
  593. *
  594. * @param {object} payload - object that contains the payload
  595. * @param {string} payload.stationId - the id of the station to skip
  596. * @returns {Promise} - returns a promise (resolve, reject)
  597. */
  598. SKIP_STATION(payload) {
  599. return new Promise((resolve, reject) => {
  600. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  601. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  602. async.waterfall(
  603. [
  604. next => {
  605. NotificationsModule.runJob("UNSCHEDULE", {
  606. name: `stations.nextSong?id=${payload.stationId}`
  607. })
  608. .then(() => {
  609. next();
  610. })
  611. .catch(next);
  612. },
  613. next => {
  614. StationsModule.runJob(
  615. "GET_STATION",
  616. {
  617. stationId: payload.stationId
  618. },
  619. this
  620. )
  621. .then(station => {
  622. next(null, station);
  623. })
  624. .catch(() => {});
  625. },
  626. // eslint-disable-next-line consistent-return
  627. (station, next) => {
  628. if (!station) return next("Station not found.");
  629. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  630. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  631. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  632. // Community station with party mode enabled and songs in the queue
  633. if (station.paused) return next(null, null, -19, station);
  634. return StationsModule.stationModel.updateOne(
  635. { _id: payload.stationId },
  636. {
  637. $pull: {
  638. queue: {
  639. _id: station.queue[0]._id
  640. }
  641. }
  642. },
  643. err => {
  644. if (err) return next(err);
  645. return next(null, station.queue[0], -12, station);
  646. }
  647. );
  648. }
  649. if (station.type === "community" && !station.partyMode) {
  650. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  651. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  652. if (err) return next(err);
  653. if (!playlist) return next(null, null, -13, station);
  654. playlist = playlist.songs;
  655. if (playlist.length > 0) {
  656. let currentSongIndex;
  657. if (station.currentSongIndex < playlist.length - 1)
  658. currentSongIndex = station.currentSongIndex + 1;
  659. else currentSongIndex = 0;
  660. const callback = (err, song) => {
  661. if (err) return next(err);
  662. if (song) return next(null, song, currentSongIndex, station);
  663. const currentSong = {
  664. songId: playlist[currentSongIndex].songId,
  665. title: playlist[currentSongIndex].title,
  666. duration: playlist[currentSongIndex].duration,
  667. likes: -1,
  668. dislikes: -1
  669. };
  670. return next(null, currentSong, currentSongIndex, station);
  671. };
  672. if (playlist[currentSongIndex]._id)
  673. return SongsModule.runJob(
  674. "GET_SONG",
  675. {
  676. id: playlist[currentSongIndex]._id
  677. },
  678. this
  679. )
  680. .then(response => callback(null, response.song))
  681. .catch(callback);
  682. return SongsModule.runJob(
  683. "GET_SONG_FROM_ID",
  684. {
  685. songId: playlist[currentSongIndex].songId
  686. },
  687. this
  688. )
  689. .then(response => callback(null, response.song))
  690. .catch(callback);
  691. }
  692. return next(null, null, -14, station);
  693. })
  694. );
  695. }
  696. if (station.type === "official" && station.playlist.length === 0) {
  697. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  698. .then(playlist => {
  699. if (playlist.length === 0)
  700. return next(null, StationsModule.defaultSong, 0, station);
  701. return SongsModule.runJob(
  702. "GET_SONG",
  703. {
  704. id: playlist[0]
  705. },
  706. this
  707. )
  708. .then(response => {
  709. next(null, response.song, 0, station);
  710. })
  711. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  712. })
  713. .catch(err => {
  714. next(err);
  715. });
  716. }
  717. if (station.type === "official" && station.playlist.length > 0) {
  718. return async.doUntil(
  719. next => {
  720. if (station.currentSongIndex < station.playlist.length - 1) {
  721. SongsModule.runJob(
  722. "GET_SONG",
  723. {
  724. id: station.playlist[station.currentSongIndex + 1]
  725. },
  726. this
  727. )
  728. .then(response => next(null, response.song, station.currentSongIndex + 1))
  729. .catch(() => {
  730. station.currentSongIndex += 1;
  731. next(null, null, null);
  732. });
  733. } else {
  734. StationsModule.runJob(
  735. "CALCULATE_SONG_FOR_STATION",
  736. {
  737. station
  738. },
  739. this
  740. )
  741. .then(newPlaylist => {
  742. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  743. .then(response => {
  744. station.playlist = newPlaylist;
  745. next(null, response.song, 0);
  746. })
  747. .catch(() => next(null, StationsModule.defaultSong, 0));
  748. })
  749. .catch(() => {
  750. next(null, StationsModule.defaultSong, 0);
  751. });
  752. }
  753. },
  754. (song, currentSongIndex, next) => {
  755. if (song) return next(null, true, currentSongIndex);
  756. return next(null, false);
  757. },
  758. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  759. );
  760. }
  761. },
  762. (song, currentSongIndex, station, next) => {
  763. const $set = {};
  764. if (song === null) $set.currentSong = null;
  765. else if (song.likes === -1 && song.dislikes === -1) {
  766. $set.currentSong = {
  767. songId: song.songId,
  768. title: song.title,
  769. duration: song.duration,
  770. skipDuration: 0,
  771. likes: -1,
  772. dislikes: -1
  773. };
  774. } else {
  775. $set.currentSong = {
  776. songId: song.songId,
  777. title: song.title,
  778. artists: song.artists,
  779. duration: song.duration,
  780. likes: song.likes,
  781. dislikes: song.dislikes,
  782. skipDuration: song.skipDuration,
  783. thumbnail: song.thumbnail
  784. };
  785. }
  786. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  787. $set.startedAt = Date.now();
  788. $set.timePaused = 0;
  789. if (station.paused) $set.pausedAt = Date.now();
  790. next(null, $set, station);
  791. },
  792. ($set, station, next) => {
  793. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  794. StationsModule.runJob(
  795. "UPDATE_STATION",
  796. {
  797. stationId: station._id
  798. },
  799. this
  800. )
  801. .then(station => {
  802. if (station.type === "community" && station.partyMode === true)
  803. CacheModule.runJob("PUB", {
  804. channel: "station.queueUpdate",
  805. value: payload.stationId
  806. })
  807. .then()
  808. .catch();
  809. next(null, station);
  810. })
  811. .catch(next);
  812. });
  813. }
  814. ],
  815. async (err, station) => {
  816. if (err) {
  817. err = await UtilsModule.runJob(
  818. "GET_ERROR",
  819. {
  820. error: err
  821. },
  822. this
  823. );
  824. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  825. reject(new Error(err));
  826. } else {
  827. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  828. station.currentSong.skipVotes = 0;
  829. }
  830. // TODO Pub/Sub this
  831. IOModule.runJob("EMIT_TO_ROOM", {
  832. room: `station.${station._id}`,
  833. args: [
  834. "event:songs.next",
  835. {
  836. currentSong: station.currentSong,
  837. startedAt: station.startedAt,
  838. paused: station.paused,
  839. timePaused: 0
  840. }
  841. ]
  842. })
  843. .then()
  844. .catch();
  845. if (station.privacy === "public") {
  846. IOModule.runJob("EMIT_TO_ROOM", {
  847. room: "home",
  848. args: ["event:station.nextSong", station._id, station.currentSong]
  849. })
  850. .then()
  851. .catch();
  852. } else {
  853. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  854. Object.keys(sockets).forEach(socketKey => {
  855. const socket = sockets[socketKey];
  856. const { session } = socket;
  857. if (session.sessionId) {
  858. CacheModule.runJob(
  859. "HGET",
  860. {
  861. table: "sessions",
  862. key: session.sessionId
  863. },
  864. this
  865. // eslint-disable-next-line no-loop-func
  866. ).then(session => {
  867. if (session) {
  868. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  869. userModel => {
  870. userModel.findOne(
  871. {
  872. _id: session.userId
  873. },
  874. (err, user) => {
  875. if (!err && user) {
  876. if (user.role === "admin")
  877. socket.emit(
  878. "event:station.nextSong",
  879. station._id,
  880. station.currentSong
  881. );
  882. else if (
  883. station.type === "community" &&
  884. station.owner === session.userId
  885. )
  886. socket.emit(
  887. "event:station.nextSong",
  888. station._id,
  889. station.currentSong
  890. );
  891. }
  892. }
  893. );
  894. }
  895. );
  896. }
  897. });
  898. }
  899. });
  900. }
  901. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  902. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  903. sockets: await IOModule.runJob(
  904. "GET_ROOM_SOCKETS",
  905. {
  906. room: `station.${station._id}`
  907. },
  908. this
  909. ),
  910. room: `song.${station.currentSong.songId}`
  911. });
  912. if (!station.paused) {
  913. NotificationsModule.runJob("SCHEDULE", {
  914. name: `stations.nextSong?id=${station._id}`,
  915. time: station.currentSong.duration * 1000,
  916. station
  917. });
  918. }
  919. } else {
  920. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  921. sockets: await IOModule.runJob(
  922. "GET_ROOM_SOCKETS",
  923. {
  924. room: `station.${station._id}`
  925. },
  926. this
  927. )
  928. })
  929. .then()
  930. .catch();
  931. }
  932. resolve({ station });
  933. }
  934. }
  935. );
  936. });
  937. }
  938. /**
  939. * Checks if a user can view/access a station
  940. *
  941. * @param {object} payload - object that contains the payload
  942. * @param {object} payload.station - the station object of the station in question
  943. * @param {string} payload.userId - the id of the user in question
  944. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  945. * @returns {Promise} - returns a promise (resolve, reject)
  946. */
  947. CAN_USER_VIEW_STATION(payload) {
  948. return new Promise((resolve, reject) => {
  949. async.waterfall(
  950. [
  951. next => {
  952. if (payload.station.privacy === "public") return next(true);
  953. if (payload.station.privacy === "unlisted")
  954. if (payload.hideUnlisted === true) return next();
  955. else return next(true);
  956. if (!payload.userId) return next("Not allowed");
  957. return next();
  958. },
  959. next => {
  960. DBModule.runJob(
  961. "GET_MODEL",
  962. {
  963. modelName: "user"
  964. },
  965. this
  966. ).then(userModel => {
  967. userModel.findOne({ _id: payload.userId }, next);
  968. });
  969. },
  970. (user, next) => {
  971. if (!user) return next("Not allowed");
  972. if (user.role === "admin") return next(true);
  973. if (payload.station.type === "official") return next("Not allowed");
  974. if (payload.station.owner === payload.userId) return next(true);
  975. return next("Not allowed");
  976. }
  977. ],
  978. async errOrResult => {
  979. if (errOrResult !== true && errOrResult !== "Not allowed") {
  980. errOrResult = await UtilsModule.runJob(
  981. "GET_ERROR",
  982. {
  983. error: errOrResult
  984. },
  985. this
  986. );
  987. reject(new Error(errOrResult));
  988. } else {
  989. resolve(errOrResult === true);
  990. }
  991. }
  992. );
  993. });
  994. }
  995. }
  996. export default new _StationsModule();