stations.js 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1
  35. };
  36. this.userList = {};
  37. this.usersPerStation = {};
  38. this.usersPerStationCount = {};
  39. // TEMP
  40. CacheModule.runJob("SUB", {
  41. channel: "station.pause",
  42. cb: async stationId => {
  43. NotificationsModule.runJob("REMOVE", {
  44. subscription: `stations.nextSong?id=${stationId}`
  45. }).then();
  46. }
  47. });
  48. CacheModule.runJob("SUB", {
  49. channel: "station.resume",
  50. cb: async stationId => {
  51. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  52. }
  53. });
  54. CacheModule.runJob("SUB", {
  55. channel: "station.queueUpdate",
  56. cb: async stationId => {
  57. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  58. if (!station.currentSong && station.queue.length > 0) {
  59. StationsModule.runJob("INITIALIZE_STATION", {
  60. stationId
  61. }).then();
  62. }
  63. });
  64. }
  65. });
  66. CacheModule.runJob("SUB", {
  67. channel: "station.newOfficialPlaylist",
  68. cb: async stationId => {
  69. CacheModule.runJob("HGET", {
  70. table: "officialPlaylists",
  71. key: stationId
  72. }).then(playlistObj => {
  73. if (playlistObj) {
  74. IOModule.runJob("EMIT_TO_ROOM", {
  75. room: `station.${stationId}`,
  76. args: ["event:newOfficialPlaylist", playlistObj.songs]
  77. });
  78. }
  79. });
  80. }
  81. });
  82. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  83. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  84. return new Promise((resolve, reject) =>
  85. async.waterfall(
  86. [
  87. next => {
  88. this.setStage(2);
  89. CacheModule.runJob("HGETALL", { table: "stations" })
  90. .then(stations => {
  91. next(null, stations);
  92. })
  93. .catch(next);
  94. },
  95. (stations, next) => {
  96. this.setStage(3);
  97. if (!stations) return next();
  98. const stationIds = Object.keys(stations);
  99. return async.each(
  100. stationIds,
  101. (stationId, next) => {
  102. stationModel.findOne({ _id: stationId }, (err, station) => {
  103. if (err) next(err);
  104. else if (!station) {
  105. CacheModule.runJob("HDEL", {
  106. table: "stations",
  107. key: stationId
  108. })
  109. .then(() => {
  110. next();
  111. })
  112. .catch(next);
  113. } else next();
  114. });
  115. },
  116. next
  117. );
  118. },
  119. next => {
  120. this.setStage(4);
  121. stationModel.find({}, next);
  122. },
  123. (stations, next) => {
  124. this.setStage(5);
  125. async.each(
  126. stations,
  127. (station, next2) => {
  128. async.waterfall(
  129. [
  130. next => {
  131. CacheModule.runJob("HSET", {
  132. table: "stations",
  133. key: station._id,
  134. value: stationSchema(station)
  135. })
  136. .then(station => next(null, station))
  137. .catch(next);
  138. },
  139. (station, next) => {
  140. StationsModule.runJob(
  141. "INITIALIZE_STATION",
  142. {
  143. stationId: station._id
  144. },
  145. null,
  146. -1
  147. )
  148. .then(() => {
  149. next();
  150. })
  151. .catch(next);
  152. }
  153. ],
  154. err => {
  155. next2(err);
  156. }
  157. );
  158. },
  159. next
  160. );
  161. }
  162. ],
  163. async err => {
  164. if (err) {
  165. err = await UtilsModule.runJob("GET_ERROR", {
  166. error: err
  167. });
  168. reject(new Error(err));
  169. } else {
  170. resolve();
  171. }
  172. }
  173. )
  174. );
  175. }
  176. /**
  177. * Initialises a station
  178. *
  179. * @param {object} payload - object that contains the payload
  180. * @param {string} payload.stationId - id of the station to initialise
  181. * @returns {Promise} - returns a promise (resolve, reject)
  182. */
  183. INITIALIZE_STATION(payload) {
  184. return new Promise((resolve, reject) => {
  185. // if (typeof cb !== 'function') cb = ()=>{};
  186. async.waterfall(
  187. [
  188. next => {
  189. StationsModule.runJob(
  190. "GET_STATION",
  191. {
  192. stationId: payload.stationId
  193. },
  194. this
  195. )
  196. .then(station => {
  197. next(null, station);
  198. })
  199. .catch(next);
  200. },
  201. (station, next) => {
  202. if (!station) return next("Station not found.");
  203. return NotificationsModule.runJob(
  204. "UNSCHEDULE",
  205. {
  206. name: `stations.nextSong?id=${station._id}`
  207. },
  208. this
  209. )
  210. .then()
  211. .catch()
  212. .finally(() => {
  213. NotificationsModule.runJob("SUBSCRIBE", {
  214. name: `stations.nextSong?id=${station._id}`,
  215. cb: () =>
  216. StationsModule.runJob("SKIP_STATION", {
  217. stationId: station._id
  218. }),
  219. unique: true,
  220. station
  221. })
  222. .then()
  223. .catch();
  224. if (station.paused) return next(true, station);
  225. return next(null, station);
  226. });
  227. },
  228. (station, next) => {
  229. if (!station.currentSong) {
  230. return StationsModule.runJob(
  231. "SKIP_STATION",
  232. {
  233. stationId: station._id
  234. },
  235. this
  236. )
  237. .then(station => {
  238. next(true, station);
  239. })
  240. .catch(next)
  241. .finally(() => {});
  242. }
  243. let timeLeft =
  244. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  245. if (Number.isNaN(timeLeft)) timeLeft = -1;
  246. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  247. return StationsModule.runJob(
  248. "SKIP_STATION",
  249. {
  250. stationId: station._id
  251. },
  252. this
  253. )
  254. .then(station => {
  255. next(null, station);
  256. })
  257. .catch(next);
  258. }
  259. // name, time, cb, station
  260. NotificationsModule.runJob("SCHEDULE", {
  261. name: `stations.nextSong?id=${station._id}`,
  262. time: timeLeft,
  263. station
  264. });
  265. return next(null, station);
  266. }
  267. ],
  268. async (err, station) => {
  269. if (err && err !== true) {
  270. err = await UtilsModule.runJob(
  271. "GET_ERROR",
  272. {
  273. error: err
  274. },
  275. this
  276. );
  277. reject(new Error(err));
  278. } else resolve(station);
  279. }
  280. );
  281. });
  282. }
  283. /**
  284. * Calculates the next song for the station
  285. *
  286. * @param {object} payload - object that contains the payload
  287. * @param {string} payload.station - station object to calculate song for
  288. * @returns {Promise} - returns a promise (resolve, reject)
  289. */
  290. async CALCULATE_SONG_FOR_STATION(payload) {
  291. // station, bypassValidate = false
  292. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  293. return new Promise((resolve, reject) => {
  294. const songList = [];
  295. return async.waterfall(
  296. [
  297. next => {
  298. if (payload.station.genres.length === 0) return next();
  299. const genresDone = [];
  300. const blacklistedGenres = payload.station.blacklistedGenres.map(blacklistedGenre =>
  301. blacklistedGenre.toLowerCase()
  302. );
  303. return payload.station.genres.forEach(genre => {
  304. songModel.find({ genres: { $regex: genre, $options: "i" } }, (err, songs) => {
  305. if (!err) {
  306. songs.forEach(song => {
  307. if (songList.indexOf(song._id) === -1) {
  308. let found = false;
  309. song.genres.forEach(songGenre => {
  310. if (blacklistedGenres.indexOf(songGenre.toLowerCase()) !== -1)
  311. found = true;
  312. });
  313. if (!found) {
  314. songList.push(song._id);
  315. }
  316. }
  317. });
  318. }
  319. genresDone.push(genre);
  320. if (genresDone.length === payload.station.genres.length) next();
  321. });
  322. });
  323. },
  324. next => {
  325. const playlist = [];
  326. songList.forEach(songId => {
  327. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  328. });
  329. // eslint-disable-next-line array-callback-return
  330. payload.station.playlist.filter(songId => {
  331. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  332. });
  333. UtilsModule.runJob("SHUFFLE", { array: playlist })
  334. .then(result => {
  335. next(null, result.array);
  336. }, this)
  337. .catch(next);
  338. },
  339. (playlist, next) => {
  340. StationsModule.runJob(
  341. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  342. {
  343. stationId: payload.station._id,
  344. songList: playlist
  345. },
  346. this
  347. )
  348. .then(() => {
  349. next(null, playlist);
  350. })
  351. .catch(next);
  352. },
  353. (playlist, next) => {
  354. StationsModule.stationModel.updateOne(
  355. { _id: payload.station._id },
  356. { $set: { playlist } },
  357. { runValidators: true },
  358. () => {
  359. StationsModule.runJob(
  360. "UPDATE_STATION",
  361. {
  362. stationId: payload.station._id
  363. },
  364. this
  365. )
  366. .then(() => {
  367. next(null, playlist);
  368. })
  369. .catch(next);
  370. }
  371. );
  372. }
  373. ],
  374. (err, newPlaylist) => {
  375. if (err) return reject(new Error(err));
  376. return resolve(newPlaylist);
  377. }
  378. );
  379. });
  380. }
  381. /**
  382. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  383. *
  384. * @param {object} payload - object that contains the payload
  385. * @param {string} payload.stationId - id of the station
  386. * @returns {Promise} - returns a promise (resolve, reject)
  387. */
  388. GET_STATION(payload) {
  389. return new Promise((resolve, reject) => {
  390. async.waterfall(
  391. [
  392. next => {
  393. CacheModule.runJob(
  394. "HGET",
  395. {
  396. table: "stations",
  397. key: payload.stationId
  398. },
  399. this
  400. )
  401. .then(station => {
  402. next(null, station);
  403. })
  404. .catch(next);
  405. },
  406. (station, next) => {
  407. if (station) return next(true, station);
  408. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  409. },
  410. (station, next) => {
  411. if (station) {
  412. if (station.type === "official") {
  413. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  414. stationId: station._id,
  415. songList: station.playlist
  416. })
  417. .then()
  418. .catch();
  419. }
  420. station = StationsModule.stationSchema(station);
  421. CacheModule.runJob("HSET", {
  422. table: "stations",
  423. key: payload.stationId,
  424. value: station
  425. })
  426. .then()
  427. .catch();
  428. next(true, station);
  429. } else next("Station not found");
  430. }
  431. ],
  432. async (err, station) => {
  433. if (err && err !== true) {
  434. err = await UtilsModule.runJob(
  435. "GET_ERROR",
  436. {
  437. error: err
  438. },
  439. this
  440. );
  441. reject(new Error(err));
  442. } else resolve(station);
  443. }
  444. );
  445. });
  446. }
  447. /**
  448. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  449. *
  450. * @param {object} payload - object that contains the payload
  451. * @param {string} payload.stationName - the unique name of the station
  452. * @returns {Promise} - returns a promise (resolve, reject)
  453. */
  454. async GET_STATION_BY_NAME(payload) {
  455. return new Promise((resolve, reject) =>
  456. async.waterfall(
  457. [
  458. next => {
  459. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  460. },
  461. (station, next) => {
  462. if (station) {
  463. if (station.type === "official") {
  464. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  465. stationId: station._id,
  466. songList: station.playlist
  467. });
  468. }
  469. station = StationsModule.stationSchema(station);
  470. CacheModule.runJob("HSET", {
  471. table: "stations",
  472. key: station._id,
  473. value: station
  474. });
  475. next(true, station);
  476. } else next("Station not found");
  477. }
  478. ],
  479. (err, station) => {
  480. if (err && err !== true) return reject(new Error(err));
  481. return resolve(station);
  482. }
  483. )
  484. );
  485. }
  486. /**
  487. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  488. *
  489. * @param {object} payload - object that contains the payload
  490. * @param {string} payload.stationId - the id of the station to update
  491. * @returns {Promise} - returns a promise (resolve, reject)
  492. */
  493. UPDATE_STATION(payload) {
  494. return new Promise((resolve, reject) => {
  495. async.waterfall(
  496. [
  497. next => {
  498. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  499. },
  500. (station, next) => {
  501. if (!station) {
  502. CacheModule.runJob("HDEL", {
  503. table: "stations",
  504. key: payload.stationId
  505. })
  506. .then()
  507. .catch();
  508. return next("Station not found");
  509. }
  510. return CacheModule.runJob(
  511. "HSET",
  512. {
  513. table: "stations",
  514. key: payload.stationId,
  515. value: station
  516. },
  517. this
  518. )
  519. .then(station => {
  520. next(null, station);
  521. })
  522. .catch(next);
  523. }
  524. ],
  525. async (err, station) => {
  526. if (err && err !== true) {
  527. err = await UtilsModule.runJob(
  528. "GET_ERROR",
  529. {
  530. error: err
  531. },
  532. this
  533. );
  534. reject(new Error(err));
  535. } else resolve(station);
  536. }
  537. );
  538. });
  539. }
  540. /**
  541. * Creates the official playlist for a station
  542. *
  543. * @param {object} payload - object that contains the payload
  544. * @param {string} payload.stationId - the id of the station
  545. * @param {Array} payload.songList - list of songs to put in official playlist
  546. * @returns {Promise} - returns a promise (resolve, reject)
  547. */
  548. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  549. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  550. console.log(typeof payload.songList, payload.songList);
  551. return new Promise(resolve => {
  552. const lessInfoPlaylist = [];
  553. return async.each(
  554. payload.songList,
  555. (song, next) => {
  556. SongsModule.runJob("GET_SONG", { id: song }, this)
  557. .then(response => {
  558. const { song } = response;
  559. if (song) {
  560. const newSong = {
  561. songId: song.songId,
  562. title: song.title,
  563. artists: song.artists,
  564. duration: song.duration
  565. };
  566. lessInfoPlaylist.push(newSong);
  567. }
  568. })
  569. .finally(() => {
  570. next();
  571. });
  572. },
  573. () => {
  574. CacheModule.runJob(
  575. "HSET",
  576. {
  577. table: "officialPlaylists",
  578. key: payload.stationId,
  579. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  580. },
  581. this
  582. ).finally(() => {
  583. CacheModule.runJob("PUB", {
  584. channel: "station.newOfficialPlaylist",
  585. value: payload.stationId
  586. });
  587. resolve();
  588. });
  589. }
  590. );
  591. });
  592. }
  593. /**
  594. * Skips a station
  595. *
  596. * @param {object} payload - object that contains the payload
  597. * @param {string} payload.stationId - the id of the station to skip
  598. * @returns {Promise} - returns a promise (resolve, reject)
  599. */
  600. SKIP_STATION(payload) {
  601. return new Promise((resolve, reject) => {
  602. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  603. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  604. async.waterfall(
  605. [
  606. next => {
  607. NotificationsModule.runJob("UNSCHEDULE", {
  608. name: `stations.nextSong?id=${payload.stationId}`
  609. })
  610. .then(() => {
  611. next();
  612. })
  613. .catch(next);
  614. },
  615. next => {
  616. StationsModule.runJob(
  617. "GET_STATION",
  618. {
  619. stationId: payload.stationId
  620. },
  621. this
  622. )
  623. .then(station => {
  624. next(null, station);
  625. })
  626. .catch(() => {});
  627. },
  628. // eslint-disable-next-line consistent-return
  629. (station, next) => {
  630. if (!station) return next("Station not found.");
  631. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  632. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  633. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  634. // Community station with party mode enabled and songs in the queue
  635. if (station.paused) return next(null, null, -19, station);
  636. return StationsModule.stationModel.updateOne(
  637. { _id: payload.stationId },
  638. {
  639. $pull: {
  640. queue: {
  641. _id: station.queue[0]._id
  642. }
  643. }
  644. },
  645. err => {
  646. if (err) return next(err);
  647. return next(null, station.queue[0], -12, station);
  648. }
  649. );
  650. }
  651. if (station.type === "community" && !station.partyMode) {
  652. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  653. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  654. if (err) return next(err);
  655. if (!playlist) return next(null, null, -13, station);
  656. playlist = playlist.songs;
  657. if (playlist.length > 0) {
  658. let currentSongIndex;
  659. if (station.currentSongIndex < playlist.length - 1)
  660. currentSongIndex = station.currentSongIndex + 1;
  661. else currentSongIndex = 0;
  662. const callback = (err, song) => {
  663. if (err) return next(err);
  664. if (song) return next(null, song, currentSongIndex, station);
  665. const currentSong = {
  666. songId: playlist[currentSongIndex].songId,
  667. title: playlist[currentSongIndex].title,
  668. duration: playlist[currentSongIndex].duration,
  669. likes: -1,
  670. dislikes: -1
  671. };
  672. return next(null, currentSong, currentSongIndex, station);
  673. };
  674. if (playlist[currentSongIndex]._id)
  675. return SongsModule.runJob(
  676. "GET_SONG",
  677. {
  678. id: playlist[currentSongIndex]._id
  679. },
  680. this
  681. )
  682. .then(response => callback(null, response.song))
  683. .catch(callback);
  684. return SongsModule.runJob(
  685. "GET_SONG_FROM_ID",
  686. {
  687. songId: playlist[currentSongIndex].songId
  688. },
  689. this
  690. )
  691. .then(response => callback(null, response.song))
  692. .catch(callback);
  693. }
  694. return next(null, null, -14, station);
  695. })
  696. );
  697. }
  698. if (station.type === "official" && station.playlist.length === 0) {
  699. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  700. .then(playlist => {
  701. if (playlist.length === 0)
  702. return next(null, StationsModule.defaultSong, 0, station);
  703. return SongsModule.runJob(
  704. "GET_SONG",
  705. {
  706. id: playlist[0]
  707. },
  708. this
  709. )
  710. .then(response => {
  711. next(null, response.song, 0, station);
  712. })
  713. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  714. })
  715. .catch(err => {
  716. next(err);
  717. });
  718. }
  719. if (station.type === "official" && station.playlist.length > 0) {
  720. return async.doUntil(
  721. next => {
  722. if (station.currentSongIndex < station.playlist.length - 1) {
  723. SongsModule.runJob(
  724. "GET_SONG",
  725. {
  726. id: station.playlist[station.currentSongIndex + 1]
  727. },
  728. this
  729. )
  730. .then(response => next(null, response.song, station.currentSongIndex + 1))
  731. .catch(() => {
  732. station.currentSongIndex += 1;
  733. next(null, null, null);
  734. });
  735. } else {
  736. StationsModule.runJob(
  737. "CALCULATE_SONG_FOR_STATION",
  738. {
  739. station
  740. },
  741. this
  742. )
  743. .then(newPlaylist => {
  744. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  745. .then(response => {
  746. station.playlist = newPlaylist;
  747. next(null, response.song, 0);
  748. })
  749. .catch(() => next(null, StationsModule.defaultSong, 0));
  750. })
  751. .catch(() => {
  752. next(null, StationsModule.defaultSong, 0);
  753. });
  754. }
  755. },
  756. (song, currentSongIndex, next) => {
  757. if (song) return next(null, true, currentSongIndex);
  758. return next(null, false);
  759. },
  760. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  761. );
  762. }
  763. },
  764. (song, currentSongIndex, station, next) => {
  765. const $set = {};
  766. if (song === null) $set.currentSong = null;
  767. else if (song.likes === -1 && song.dislikes === -1) {
  768. $set.currentSong = {
  769. songId: song.songId,
  770. title: song.title,
  771. duration: song.duration,
  772. skipDuration: 0,
  773. likes: -1,
  774. dislikes: -1
  775. };
  776. } else {
  777. $set.currentSong = {
  778. songId: song.songId,
  779. title: song.title,
  780. artists: song.artists,
  781. duration: song.duration,
  782. likes: song.likes,
  783. dislikes: song.dislikes,
  784. skipDuration: song.skipDuration,
  785. thumbnail: song.thumbnail
  786. };
  787. }
  788. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  789. $set.startedAt = Date.now();
  790. $set.timePaused = 0;
  791. if (station.paused) $set.pausedAt = Date.now();
  792. next(null, $set, station);
  793. },
  794. ($set, station, next) => {
  795. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  796. StationsModule.runJob(
  797. "UPDATE_STATION",
  798. {
  799. stationId: station._id
  800. },
  801. this
  802. )
  803. .then(station => {
  804. if (station.type === "community" && station.partyMode === true)
  805. CacheModule.runJob("PUB", {
  806. channel: "station.queueUpdate",
  807. value: payload.stationId
  808. })
  809. .then()
  810. .catch();
  811. next(null, station);
  812. })
  813. .catch(next);
  814. });
  815. }
  816. ],
  817. async (err, station) => {
  818. if (err) {
  819. err = await UtilsModule.runJob(
  820. "GET_ERROR",
  821. {
  822. error: err
  823. },
  824. this
  825. );
  826. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  827. reject(new Error(err));
  828. } else {
  829. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  830. station.currentSong.skipVotes = 0;
  831. }
  832. // TODO Pub/Sub this
  833. IOModule.runJob("EMIT_TO_ROOM", {
  834. room: `station.${station._id}`,
  835. args: [
  836. "event:songs.next",
  837. {
  838. currentSong: station.currentSong,
  839. startedAt: station.startedAt,
  840. paused: station.paused,
  841. timePaused: 0
  842. }
  843. ]
  844. })
  845. .then()
  846. .catch();
  847. if (station.privacy === "public") {
  848. IOModule.runJob("EMIT_TO_ROOM", {
  849. room: "home",
  850. args: ["event:station.nextSong", station._id, station.currentSong]
  851. })
  852. .then()
  853. .catch();
  854. } else {
  855. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  856. Object.keys(sockets).forEach(socketKey => {
  857. const socket = sockets[socketKey];
  858. const { session } = socket;
  859. if (session.sessionId) {
  860. CacheModule.runJob(
  861. "HGET",
  862. {
  863. table: "sessions",
  864. key: session.sessionId
  865. },
  866. this
  867. // eslint-disable-next-line no-loop-func
  868. ).then(session => {
  869. if (session) {
  870. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  871. userModel => {
  872. userModel.findOne(
  873. {
  874. _id: session.userId
  875. },
  876. (err, user) => {
  877. if (!err && user) {
  878. if (user.role === "admin")
  879. socket.emit(
  880. "event:station.nextSong",
  881. station._id,
  882. station.currentSong
  883. );
  884. else if (
  885. station.type === "community" &&
  886. station.owner === session.userId
  887. )
  888. socket.emit(
  889. "event:station.nextSong",
  890. station._id,
  891. station.currentSong
  892. );
  893. }
  894. }
  895. );
  896. }
  897. );
  898. }
  899. });
  900. }
  901. });
  902. }
  903. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  904. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  905. sockets: await IOModule.runJob(
  906. "GET_ROOM_SOCKETS",
  907. {
  908. room: `station.${station._id}`
  909. },
  910. this
  911. ),
  912. room: `song.${station.currentSong.songId}`
  913. });
  914. if (!station.paused) {
  915. NotificationsModule.runJob("SCHEDULE", {
  916. name: `stations.nextSong?id=${station._id}`,
  917. time: station.currentSong.duration * 1000,
  918. station
  919. });
  920. }
  921. } else {
  922. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  923. sockets: await IOModule.runJob(
  924. "GET_ROOM_SOCKETS",
  925. {
  926. room: `station.${station._id}`
  927. },
  928. this
  929. )
  930. })
  931. .then()
  932. .catch();
  933. }
  934. resolve({ station });
  935. }
  936. }
  937. );
  938. });
  939. }
  940. /**
  941. * Checks if a user can view/access a station
  942. *
  943. * @param {object} payload - object that contains the payload
  944. * @param {object} payload.station - the station object of the station in question
  945. * @param {string} payload.userId - the id of the user in question
  946. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  947. * @returns {Promise} - returns a promise (resolve, reject)
  948. */
  949. CAN_USER_VIEW_STATION(payload) {
  950. return new Promise((resolve, reject) => {
  951. async.waterfall(
  952. [
  953. next => {
  954. if (payload.station.privacy === "public") return next(true);
  955. if (payload.station.privacy === "unlisted")
  956. if (payload.hideUnlisted === true) return next();
  957. else return next(true);
  958. if (!payload.userId) return next("Not allowed");
  959. return next();
  960. },
  961. next => {
  962. DBModule.runJob(
  963. "GET_MODEL",
  964. {
  965. modelName: "user"
  966. },
  967. this
  968. ).then(userModel => {
  969. userModel.findOne({ _id: payload.userId }, next);
  970. });
  971. },
  972. (user, next) => {
  973. if (!user) return next("Not allowed");
  974. if (user.role === "admin") return next(true);
  975. if (payload.station.type === "official") return next("Not allowed");
  976. if (payload.station.owner === payload.userId) return next(true);
  977. return next("Not allowed");
  978. }
  979. ],
  980. async errOrResult => {
  981. if (errOrResult !== true && errOrResult !== "Not allowed") {
  982. errOrResult = await UtilsModule.runJob(
  983. "GET_ERROR",
  984. {
  985. error: errOrResult
  986. },
  987. this
  988. );
  989. reject(new Error(errOrResult));
  990. } else {
  991. resolve(errOrResult === true);
  992. }
  993. }
  994. );
  995. });
  996. }
  997. }
  998. export default new _StationsModule();