stations.js 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let SongsModule;
  8. let NotificationsModule;
  9. class _StationsModule extends CoreClass {
  10. // eslint-disable-next-line require-jsdoc
  11. constructor() {
  12. super("stations");
  13. StationsModule = this;
  14. }
  15. /**
  16. * Initialises the stations module
  17. *
  18. * @returns {Promise} - returns promise (reject, resolve)
  19. */
  20. async initialize() {
  21. CacheModule = this.moduleManager.modules.cache;
  22. DBModule = this.moduleManager.modules.db;
  23. UtilsModule = this.moduleManager.modules.utils;
  24. SongsModule = this.moduleManager.modules.songs;
  25. NotificationsModule = this.moduleManager.modules.notifications;
  26. this.defaultSong = {
  27. songId: "60ItHLz5WEA",
  28. title: "Faded - Alan Walker",
  29. duration: 212,
  30. skipDuration: 0,
  31. likes: -1,
  32. dislikes: -1
  33. };
  34. // TEMP
  35. CacheModule.runJob("SUB", {
  36. channel: "station.pause",
  37. cb: async stationId => {
  38. NotificationsModule.runJob("REMOVE", {
  39. subscription: `stations.nextSong?id=${stationId}`
  40. }).then();
  41. }
  42. });
  43. CacheModule.runJob("SUB", {
  44. channel: "station.resume",
  45. cb: async stationId => {
  46. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  47. }
  48. });
  49. CacheModule.runJob("SUB", {
  50. channel: "station.queueUpdate",
  51. cb: async stationId => {
  52. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  53. if (!station.currentSong && station.queue.length > 0) {
  54. StationsModule.runJob("INITIALIZE_STATION", {
  55. stationId
  56. }).then();
  57. }
  58. });
  59. }
  60. });
  61. CacheModule.runJob("SUB", {
  62. channel: "station.newOfficialPlaylist",
  63. cb: async stationId => {
  64. CacheModule.runJob("HGET", {
  65. table: "officialPlaylists",
  66. key: stationId
  67. }).then(playlistObj => {
  68. if (playlistObj) {
  69. UtilsModule.runJob("EMIT_TO_ROOM", {
  70. room: `station.${stationId}`,
  71. args: ["event:newOfficialPlaylist", playlistObj.songs]
  72. });
  73. }
  74. });
  75. }
  76. });
  77. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  78. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  79. return new Promise((resolve, reject) =>
  80. async.waterfall(
  81. [
  82. next => {
  83. this.setStage(2);
  84. CacheModule.runJob("HGETALL", { table: "stations" })
  85. .then(stations => {
  86. next(null, stations);
  87. })
  88. .catch(next);
  89. },
  90. (stations, next) => {
  91. this.setStage(3);
  92. if (!stations) return next();
  93. const stationIds = Object.keys(stations);
  94. return async.each(
  95. stationIds,
  96. (stationId, next) => {
  97. stationModel.findOne({ _id: stationId }, (err, station) => {
  98. if (err) next(err);
  99. else if (!station) {
  100. CacheModule.runJob("HDEL", {
  101. table: "stations",
  102. key: stationId
  103. })
  104. .then(() => {
  105. next();
  106. })
  107. .catch(next);
  108. } else next();
  109. });
  110. },
  111. next
  112. );
  113. },
  114. next => {
  115. this.setStage(4);
  116. stationModel.find({}, next);
  117. },
  118. (stations, next) => {
  119. this.setStage(5);
  120. async.each(
  121. stations,
  122. (station, next2) => {
  123. async.waterfall(
  124. [
  125. next => {
  126. CacheModule.runJob("HSET", {
  127. table: "stations",
  128. key: station._id,
  129. value: stationSchema(station)
  130. })
  131. .then(station => next(null, station))
  132. .catch(next);
  133. },
  134. (station, next) => {
  135. StationsModule.runJob(
  136. "INITIALIZE_STATION",
  137. {
  138. stationId: station._id
  139. },
  140. null,
  141. -1
  142. )
  143. .then(() => {
  144. next();
  145. })
  146. .catch(next);
  147. }
  148. ],
  149. err => {
  150. next2(err);
  151. }
  152. );
  153. },
  154. next
  155. );
  156. }
  157. ],
  158. async err => {
  159. if (err) {
  160. err = await UtilsModule.runJob("GET_ERROR", {
  161. error: err
  162. });
  163. reject(new Error(err));
  164. } else {
  165. resolve();
  166. }
  167. }
  168. )
  169. );
  170. }
  171. /**
  172. * Initialises a station
  173. *
  174. * @param {object} payload - object that contains the payload
  175. * @param {string} payload.stationId - id of the station to initialise
  176. * @returns {Promise} - returns a promise (resolve, reject)
  177. */
  178. INITIALIZE_STATION(payload) {
  179. return new Promise((resolve, reject) => {
  180. // if (typeof cb !== 'function') cb = ()=>{};
  181. async.waterfall(
  182. [
  183. next => {
  184. StationsModule.runJob(
  185. "GET_STATION",
  186. {
  187. stationId: payload.stationId
  188. },
  189. this
  190. )
  191. .then(station => {
  192. next(null, station);
  193. })
  194. .catch(next);
  195. },
  196. (station, next) => {
  197. if (!station) return next("Station not found.");
  198. NotificationsModule.runJob(
  199. "UNSCHEDULE",
  200. {
  201. name: `stations.nextSong?id=${station._id}`
  202. },
  203. this
  204. )
  205. .then()
  206. .catch()
  207. .finally(() => {
  208. NotificationsModule.runJob(
  209. "SUBSCRIBE",
  210. {
  211. name: `stations.nextSong?id=${station._id}`,
  212. cb: () =>
  213. StationsModule.runJob("SKIP_STATION", {
  214. stationId: station._id
  215. }),
  216. unique: true,
  217. station
  218. },
  219. this
  220. )
  221. .then()
  222. .catch();
  223. if (station.paused) return next(true, station);
  224. return next(null, station);
  225. });
  226. },
  227. (station, next) => {
  228. if (!station.currentSong) {
  229. return StationsModule.runJob(
  230. "SKIP_STATION",
  231. {
  232. stationId: station._id
  233. },
  234. this
  235. )
  236. .then(station => {
  237. next(true, station);
  238. })
  239. .catch(next)
  240. .finally(() => {});
  241. }
  242. let timeLeft =
  243. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  244. if (Number.isNaN(timeLeft)) timeLeft = -1;
  245. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  246. return StationsModule.runJob(
  247. "SKIP_STATION",
  248. {
  249. stationId: station._id
  250. },
  251. this
  252. )
  253. .then(station => {
  254. next(null, station);
  255. })
  256. .catch(next);
  257. }
  258. // name, time, cb, station
  259. NotificationsModule.runJob("SCHEDULE", {
  260. name: `stations.nextSong?id=${station._id}`,
  261. time: timeLeft,
  262. station
  263. });
  264. return next(null, station);
  265. }
  266. ],
  267. async (err, station) => {
  268. if (err && err !== true) {
  269. err = await UtilsModule.runJob(
  270. "GET_ERROR",
  271. {
  272. error: err
  273. },
  274. this
  275. );
  276. reject(new Error(err));
  277. } else resolve(station);
  278. }
  279. );
  280. });
  281. }
  282. /**
  283. * Calculates the next song for the station
  284. *
  285. * @param {object} payload - object that contains the payload
  286. * @param {string} payload.station - station object to calculate song for
  287. * @returns {Promise} - returns a promise (resolve, reject)
  288. */
  289. async CALCULATE_SONG_FOR_STATION(payload) {
  290. // station, bypassValidate = false
  291. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  292. return new Promise((resolve, reject) => {
  293. const songList = [];
  294. return async.waterfall(
  295. [
  296. next => {
  297. if (payload.station.genres.length === 0) return next();
  298. const genresDone = [];
  299. return payload.station.genres.forEach(genre => {
  300. songModel.find({ genres: genre }, (err, songs) => {
  301. if (!err) {
  302. songs.forEach(song => {
  303. if (songList.indexOf(song._id) === -1) {
  304. let found = false;
  305. song.genres.forEach(songGenre => {
  306. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  307. found = true;
  308. });
  309. if (!found) {
  310. songList.push(song._id);
  311. }
  312. }
  313. });
  314. }
  315. genresDone.push(genre);
  316. if (genresDone.length === payload.station.genres.length) next();
  317. });
  318. });
  319. },
  320. next => {
  321. const playlist = [];
  322. songList.forEach(songId => {
  323. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  324. });
  325. // eslint-disable-next-line array-callback-return
  326. payload.station.playlist.filter(songId => {
  327. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  328. });
  329. UtilsModule.runJob("SHUFFLE", { array: playlist })
  330. .then(result => {
  331. next(null, result.array);
  332. }, this)
  333. .catch(next);
  334. },
  335. (playlist, next) => {
  336. StationsModule.runJob(
  337. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  338. {
  339. stationId: payload.station._id,
  340. songList: playlist
  341. },
  342. this
  343. )
  344. .then(() => {
  345. next(null, playlist);
  346. })
  347. .catch(next);
  348. },
  349. (playlist, next) => {
  350. StationsModule.stationModel.updateOne(
  351. { _id: payload.station._id },
  352. { $set: { playlist } },
  353. { runValidators: true },
  354. () => {
  355. StationsModule.runJob(
  356. "UPDATE_STATION",
  357. {
  358. stationId: payload.station._id
  359. },
  360. this
  361. )
  362. .then(() => {
  363. next(null, playlist);
  364. })
  365. .catch(next);
  366. }
  367. );
  368. }
  369. ],
  370. (err, newPlaylist) => {
  371. if (err) return reject(new Error(err));
  372. return resolve(newPlaylist);
  373. }
  374. );
  375. });
  376. }
  377. /**
  378. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  379. *
  380. * @param {object} payload - object that contains the payload
  381. * @param {string} payload.stationId - id of the station
  382. * @returns {Promise} - returns a promise (resolve, reject)
  383. */
  384. GET_STATION(payload) {
  385. return new Promise((resolve, reject) => {
  386. async.waterfall(
  387. [
  388. next => {
  389. CacheModule.runJob(
  390. "HGET",
  391. {
  392. table: "stations",
  393. key: payload.stationId
  394. },
  395. this
  396. )
  397. .then(station => {
  398. next(null, station);
  399. })
  400. .catch(next);
  401. },
  402. (station, next) => {
  403. if (station) return next(true, station);
  404. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  405. },
  406. (station, next) => {
  407. if (station) {
  408. if (station.type === "official") {
  409. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  410. stationId: station._id,
  411. songList: station.playlist
  412. })
  413. .then()
  414. .catch();
  415. }
  416. station = StationsModule.stationSchema(station);
  417. CacheModule.runJob("HSET", {
  418. table: "stations",
  419. key: payload.stationId,
  420. value: station
  421. })
  422. .then()
  423. .catch();
  424. next(true, station);
  425. } else next("Station not found");
  426. }
  427. ],
  428. async (err, station) => {
  429. if (err && err !== true) {
  430. err = await UtilsModule.runJob(
  431. "GET_ERROR",
  432. {
  433. error: err
  434. },
  435. this
  436. );
  437. reject(new Error(err));
  438. } else resolve(station);
  439. }
  440. );
  441. });
  442. }
  443. /**
  444. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  445. *
  446. * @param {object} payload - object that contains the payload
  447. * @param {string} payload.stationName - the unique name of the station
  448. * @returns {Promise} - returns a promise (resolve, reject)
  449. */
  450. async GET_STATION_BY_NAME(payload) {
  451. return new Promise((resolve, reject) =>
  452. async.waterfall(
  453. [
  454. next => {
  455. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  456. },
  457. (station, next) => {
  458. if (station) {
  459. if (station.type === "official") {
  460. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  461. stationId: station._id,
  462. songList: station.playlist
  463. });
  464. }
  465. station = StationsModule.stationSchema(station);
  466. CacheModule.runJob("HSET", {
  467. table: "stations",
  468. key: station._id,
  469. value: station
  470. });
  471. next(true, station);
  472. } else next("Station not found");
  473. }
  474. ],
  475. (err, station) => {
  476. if (err && err !== true) return reject(new Error(err));
  477. return resolve(station);
  478. }
  479. )
  480. );
  481. }
  482. /**
  483. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  484. *
  485. * @param {object} payload - object that contains the payload
  486. * @param {string} payload.stationId - the id of the station to update
  487. * @returns {Promise} - returns a promise (resolve, reject)
  488. */
  489. UPDATE_STATION(payload) {
  490. return new Promise((resolve, reject) => {
  491. async.waterfall(
  492. [
  493. next => {
  494. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  495. },
  496. (station, next) => {
  497. if (!station) {
  498. CacheModule.runJob("HDEL", {
  499. table: "stations",
  500. key: payload.stationId
  501. })
  502. .then()
  503. .catch();
  504. return next("Station not found");
  505. }
  506. return CacheModule.runJob(
  507. "HSET",
  508. {
  509. table: "stations",
  510. key: payload.stationId,
  511. value: station
  512. },
  513. this
  514. )
  515. .then(station => {
  516. next(null, station);
  517. })
  518. .catch(next);
  519. }
  520. ],
  521. async (err, station) => {
  522. if (err && err !== true) {
  523. err = await UtilsModule.runJob(
  524. "GET_ERROR",
  525. {
  526. error: err
  527. },
  528. this
  529. );
  530. reject(new Error(err));
  531. } else resolve(station);
  532. }
  533. );
  534. });
  535. }
  536. /**
  537. * Creates the official playlist for a station
  538. *
  539. * @param {object} payload - object that contains the payload
  540. * @param {string} payload.stationId - the id of the station
  541. * @param {Array} payload.songList - list of songs to put in official playlist
  542. * @returns {Promise} - returns a promise (resolve, reject)
  543. */
  544. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  545. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  546. console.log(typeof payload.songList, payload.songList);
  547. return new Promise(resolve => {
  548. const lessInfoPlaylist = [];
  549. return async.each(
  550. payload.songList,
  551. (song, next) => {
  552. SongsModule.runJob("GET_SONG", { id: song }, this)
  553. .then(response => {
  554. const { song } = response;
  555. if (song) {
  556. const newSong = {
  557. songId: song.songId,
  558. title: song.title,
  559. artists: song.artists,
  560. duration: song.duration
  561. };
  562. lessInfoPlaylist.push(newSong);
  563. }
  564. })
  565. .finally(() => {
  566. next();
  567. });
  568. },
  569. () => {
  570. CacheModule.runJob(
  571. "HSET",
  572. {
  573. table: "officialPlaylists",
  574. key: payload.stationId,
  575. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  576. },
  577. this
  578. ).finally(() => {
  579. CacheModule.runJob("PUB", {
  580. channel: "station.newOfficialPlaylist",
  581. value: payload.stationId
  582. });
  583. resolve();
  584. });
  585. }
  586. );
  587. });
  588. }
  589. /**
  590. * Skips a station
  591. *
  592. * @param {object} payload - object that contains the payload
  593. * @param {string} payload.stationId - the id of the station to skip
  594. * @returns {Promise} - returns a promise (resolve, reject)
  595. */
  596. SKIP_STATION(payload) {
  597. return new Promise((resolve, reject) => {
  598. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  599. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  600. async.waterfall(
  601. [
  602. next => {
  603. StationsModule.runJob(
  604. "GET_STATION",
  605. {
  606. stationId: payload.stationId
  607. },
  608. this
  609. )
  610. .then(station => {
  611. next(null, station);
  612. })
  613. .catch(() => {});
  614. },
  615. // eslint-disable-next-line consistent-return
  616. (station, next) => {
  617. if (!station) return next("Station not found.");
  618. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  619. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  620. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  621. // Community station with party mode enabled and songs in the queue
  622. if (station.paused) return next(null, null, -19, station);
  623. return StationsModule.stationModel.updateOne(
  624. { _id: payload.stationId },
  625. {
  626. $pull: {
  627. queue: {
  628. _id: station.queue[0]._id
  629. }
  630. }
  631. },
  632. err => {
  633. if (err) return next(err);
  634. return next(null, station.queue[0], -12, station);
  635. }
  636. );
  637. }
  638. if (station.type === "community" && !station.partyMode) {
  639. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  640. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  641. if (err) return next(err);
  642. if (!playlist) return next(null, null, -13, station);
  643. playlist = playlist.songs;
  644. if (playlist.length > 0) {
  645. let currentSongIndex;
  646. if (station.currentSongIndex < playlist.length - 1)
  647. currentSongIndex = station.currentSongIndex + 1;
  648. else currentSongIndex = 0;
  649. const callback = (err, song) => {
  650. if (err) return next(err);
  651. if (song) return next(null, song, currentSongIndex, station);
  652. const currentSong = {
  653. songId: playlist[currentSongIndex].songId,
  654. title: playlist[currentSongIndex].title,
  655. duration: playlist[currentSongIndex].duration,
  656. likes: -1,
  657. dislikes: -1
  658. };
  659. return next(null, currentSong, currentSongIndex, station);
  660. };
  661. if (playlist[currentSongIndex]._id)
  662. return SongsModule.runJob(
  663. "GET_SONG",
  664. {
  665. id: playlist[currentSongIndex]._id
  666. },
  667. this
  668. )
  669. .then(response => callback(null, response.song))
  670. .catch(callback);
  671. return SongsModule.runJob(
  672. "GET_SONG_FROM_ID",
  673. {
  674. songId: playlist[currentSongIndex].songId
  675. },
  676. this
  677. )
  678. .then(response => callback(null, response.song))
  679. .catch(callback);
  680. }
  681. return next(null, null, -14, station);
  682. })
  683. );
  684. }
  685. if (station.type === "official" && station.playlist.length === 0) {
  686. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  687. .then(playlist => {
  688. if (playlist.length === 0)
  689. return next(null, StationsModule.defaultSong, 0, station);
  690. return SongsModule.runJob(
  691. "GET_SONG",
  692. {
  693. id: playlist[0]
  694. },
  695. this
  696. )
  697. .then(response => {
  698. next(null, response.song, 0, station);
  699. })
  700. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  701. })
  702. .catch(err => {
  703. next(err);
  704. });
  705. }
  706. if (station.type === "official" && station.playlist.length > 0) {
  707. return async.doUntil(
  708. next => {
  709. if (station.currentSongIndex < station.playlist.length - 1) {
  710. SongsModule.runJob(
  711. "GET_SONG",
  712. {
  713. id: station.playlist[station.currentSongIndex + 1]
  714. },
  715. this
  716. )
  717. .then(response => next(null, response.song, station.currentSongIndex + 1))
  718. .catch(() => {
  719. station.currentSongIndex += 1;
  720. next(null, null, null);
  721. });
  722. } else {
  723. StationsModule.runJob(
  724. "CALCULATE_SONG_FOR_STATION",
  725. {
  726. station
  727. },
  728. this
  729. )
  730. .then(newPlaylist => {
  731. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  732. .then(response => {
  733. station.playlist = newPlaylist;
  734. next(null, response.song, 0);
  735. })
  736. .catch(() => next(null, StationsModule.defaultSong, 0));
  737. })
  738. .catch(() => {
  739. next(null, StationsModule.defaultSong, 0);
  740. });
  741. }
  742. },
  743. (song, currentSongIndex, next) => {
  744. if (song) return next(null, true, currentSongIndex);
  745. return next(null, false);
  746. },
  747. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  748. );
  749. }
  750. },
  751. (song, currentSongIndex, station, next) => {
  752. const $set = {};
  753. if (song === null) $set.currentSong = null;
  754. else if (song.likes === -1 && song.dislikes === -1) {
  755. $set.currentSong = {
  756. songId: song.songId,
  757. title: song.title,
  758. duration: song.duration,
  759. skipDuration: 0,
  760. likes: -1,
  761. dislikes: -1
  762. };
  763. } else {
  764. $set.currentSong = {
  765. songId: song.songId,
  766. title: song.title,
  767. artists: song.artists,
  768. duration: song.duration,
  769. likes: song.likes,
  770. dislikes: song.dislikes,
  771. skipDuration: song.skipDuration,
  772. thumbnail: song.thumbnail
  773. };
  774. }
  775. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  776. $set.startedAt = Date.now();
  777. $set.timePaused = 0;
  778. if (station.paused) $set.pausedAt = Date.now();
  779. next(null, $set, station);
  780. },
  781. ($set, station, next) => {
  782. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  783. StationsModule.runJob(
  784. "UPDATE_STATION",
  785. {
  786. stationId: station._id
  787. },
  788. this
  789. )
  790. .then(station => {
  791. if (station.type === "community" && station.partyMode === true)
  792. CacheModule.runJob("PUB", {
  793. channel: "station.queueUpdate",
  794. value: payload.stationId
  795. })
  796. .then()
  797. .catch();
  798. next(null, station);
  799. })
  800. .catch(next);
  801. });
  802. }
  803. ],
  804. async (err, station) => {
  805. if (err) {
  806. err = await UtilsModule.runJob(
  807. "GET_ERROR",
  808. {
  809. error: err
  810. },
  811. this
  812. );
  813. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  814. reject(new Error(err));
  815. } else {
  816. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  817. station.currentSong.skipVotes = 0;
  818. }
  819. // TODO Pub/Sub this
  820. UtilsModule.runJob("EMIT_TO_ROOM", {
  821. room: `station.${station._id}`,
  822. args: [
  823. "event:songs.next",
  824. {
  825. currentSong: station.currentSong,
  826. startedAt: station.startedAt,
  827. paused: station.paused,
  828. timePaused: 0
  829. }
  830. ]
  831. })
  832. .then()
  833. .catch();
  834. if (station.privacy === "public") {
  835. UtilsModule.runJob("EMIT_TO_ROOM", {
  836. room: "home",
  837. args: ["event:station.nextSong", station._id, station.currentSong]
  838. })
  839. .then()
  840. .catch();
  841. } else {
  842. const sockets = await UtilsModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  843. Object.keys(sockets).forEach(socketKey => {
  844. const socket = sockets[socketKey];
  845. const { session } = socket;
  846. if (session.sessionId) {
  847. CacheModule.runJob(
  848. "HGET",
  849. {
  850. table: "sessions",
  851. key: session.sessionId
  852. },
  853. this
  854. // eslint-disable-next-line no-loop-func
  855. ).then(session => {
  856. if (session) {
  857. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  858. userModel => {
  859. userModel.findOne(
  860. {
  861. _id: session.userId
  862. },
  863. (err, user) => {
  864. if (!err && user) {
  865. if (user.role === "admin")
  866. socket.emit(
  867. "event:station.nextSong",
  868. station._id,
  869. station.currentSong
  870. );
  871. else if (
  872. station.type === "community" &&
  873. station.owner === session.userId
  874. )
  875. socket.emit(
  876. "event:station.nextSong",
  877. station._id,
  878. station.currentSong
  879. );
  880. }
  881. }
  882. );
  883. }
  884. );
  885. }
  886. });
  887. }
  888. });
  889. }
  890. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  891. UtilsModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  892. sockets: await UtilsModule.runJob(
  893. "GET_ROOM_SOCKETS",
  894. {
  895. room: `station.${station._id}`
  896. },
  897. this
  898. ),
  899. room: `song.${station.currentSong.songId}`
  900. });
  901. if (!station.paused) {
  902. NotificationsModule.runJob("SCHEDULE", {
  903. name: `stations.nextSong?id=${station._id}`,
  904. time: station.currentSong.duration * 1000,
  905. station
  906. });
  907. }
  908. } else {
  909. UtilsModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  910. sockets: await UtilsModule.runJob(
  911. "GET_ROOM_SOCKETS",
  912. {
  913. room: `station.${station._id}`
  914. },
  915. this
  916. )
  917. })
  918. .then()
  919. .catch();
  920. }
  921. resolve({ station });
  922. }
  923. }
  924. );
  925. });
  926. }
  927. /**
  928. * Checks if a user can view/access a station
  929. *
  930. * @param {object} payload - object that contains the payload
  931. * @param {object} payload.station - the station object of the station in question
  932. * @param {string} payload.userId - the id of the user in question
  933. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  934. * @returns {Promise} - returns a promise (resolve, reject)
  935. */
  936. CAN_USER_VIEW_STATION(payload) {
  937. return new Promise((resolve, reject) => {
  938. async.waterfall(
  939. [
  940. next => {
  941. if (payload.station.privacy === "public") return next(true);
  942. if (payload.station.privacy === "unlisted")
  943. if (payload.hideUnlisted === true) return next();
  944. else return next(true);
  945. if (!payload.userId) return next("Not allowed");
  946. return next();
  947. },
  948. next => {
  949. DBModule.runJob(
  950. "GET_MODEL",
  951. {
  952. modelName: "user"
  953. },
  954. this
  955. ).then(userModel => {
  956. userModel.findOne({ _id: payload.userId }, next);
  957. });
  958. },
  959. (user, next) => {
  960. if (!user) return next("Not allowed");
  961. if (user.role === "admin") return next(true);
  962. if (payload.station.type === "official") return next("Not allowed");
  963. if (payload.station.owner === payload.userId) return next(true);
  964. return next("Not allowed");
  965. }
  966. ],
  967. async errOrResult => {
  968. if (errOrResult !== true && errOrResult !== "Not allowed") {
  969. errOrResult = await UtilsModule.runJob(
  970. "GET_ERROR",
  971. {
  972. error: errOrResult
  973. },
  974. this
  975. );
  976. reject(new Error(errOrResult));
  977. } else {
  978. resolve(errOrResult === true);
  979. }
  980. }
  981. );
  982. });
  983. }
  984. }
  985. export default new _StationsModule();