stations.js 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1
  35. };
  36. this.userList = {};
  37. this.usersPerStation = {};
  38. this.usersPerStationCount = {};
  39. // TEMP
  40. CacheModule.runJob("SUB", {
  41. channel: "station.pause",
  42. cb: async stationId => {
  43. NotificationsModule.runJob("REMOVE", {
  44. subscription: `stations.nextSong?id=${stationId}`
  45. }).then();
  46. }
  47. });
  48. CacheModule.runJob("SUB", {
  49. channel: "station.resume",
  50. cb: async stationId => {
  51. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  52. }
  53. });
  54. CacheModule.runJob("SUB", {
  55. channel: "station.queueUpdate",
  56. cb: async stationId => {
  57. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  58. if (!station.currentSong && station.queue.length > 0) {
  59. StationsModule.runJob("INITIALIZE_STATION", {
  60. stationId
  61. }).then();
  62. }
  63. });
  64. }
  65. });
  66. CacheModule.runJob("SUB", {
  67. channel: "station.newOfficialPlaylist",
  68. cb: async stationId => {
  69. CacheModule.runJob("HGET", {
  70. table: "officialPlaylists",
  71. key: stationId
  72. }).then(playlistObj => {
  73. if (playlistObj) {
  74. IOModule.runJob("EMIT_TO_ROOM", {
  75. room: `station.${stationId}`,
  76. args: ["event:newOfficialPlaylist", playlistObj.songs]
  77. });
  78. }
  79. });
  80. }
  81. });
  82. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  83. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  84. return new Promise((resolve, reject) =>
  85. async.waterfall(
  86. [
  87. next => {
  88. this.setStage(2);
  89. CacheModule.runJob("HGETALL", { table: "stations" })
  90. .then(stations => {
  91. next(null, stations);
  92. })
  93. .catch(next);
  94. },
  95. (stations, next) => {
  96. this.setStage(3);
  97. if (!stations) return next();
  98. const stationIds = Object.keys(stations);
  99. return async.each(
  100. stationIds,
  101. (stationId, next) => {
  102. stationModel.findOne({ _id: stationId }, (err, station) => {
  103. if (err) next(err);
  104. else if (!station) {
  105. CacheModule.runJob("HDEL", {
  106. table: "stations",
  107. key: stationId
  108. })
  109. .then(() => {
  110. next();
  111. })
  112. .catch(next);
  113. } else next();
  114. });
  115. },
  116. next
  117. );
  118. },
  119. next => {
  120. this.setStage(4);
  121. stationModel.find({}, next);
  122. },
  123. (stations, next) => {
  124. this.setStage(5);
  125. async.each(
  126. stations,
  127. (station, next2) => {
  128. async.waterfall(
  129. [
  130. next => {
  131. CacheModule.runJob("HSET", {
  132. table: "stations",
  133. key: station._id,
  134. value: stationSchema(station)
  135. })
  136. .then(station => next(null, station))
  137. .catch(next);
  138. },
  139. (station, next) => {
  140. StationsModule.runJob(
  141. "INITIALIZE_STATION",
  142. {
  143. stationId: station._id
  144. },
  145. null,
  146. -1
  147. )
  148. .then(() => {
  149. next();
  150. })
  151. .catch(next);
  152. }
  153. ],
  154. err => {
  155. next2(err);
  156. }
  157. );
  158. },
  159. next
  160. );
  161. }
  162. ],
  163. async err => {
  164. if (err) {
  165. err = await UtilsModule.runJob("GET_ERROR", {
  166. error: err
  167. });
  168. reject(new Error(err));
  169. } else {
  170. resolve();
  171. }
  172. }
  173. )
  174. );
  175. }
  176. /**
  177. * Initialises a station
  178. *
  179. * @param {object} payload - object that contains the payload
  180. * @param {string} payload.stationId - id of the station to initialise
  181. * @returns {Promise} - returns a promise (resolve, reject)
  182. */
  183. INITIALIZE_STATION(payload) {
  184. return new Promise((resolve, reject) => {
  185. // if (typeof cb !== 'function') cb = ()=>{};
  186. async.waterfall(
  187. [
  188. next => {
  189. StationsModule.runJob(
  190. "GET_STATION",
  191. {
  192. stationId: payload.stationId
  193. },
  194. this
  195. )
  196. .then(station => {
  197. next(null, station);
  198. })
  199. .catch(next);
  200. },
  201. (station, next) => {
  202. if (!station) return next("Station not found.");
  203. return NotificationsModule.runJob(
  204. "UNSCHEDULE",
  205. {
  206. name: `stations.nextSong?id=${station._id}`
  207. },
  208. this
  209. )
  210. .then()
  211. .catch()
  212. .finally(() => {
  213. NotificationsModule.runJob("SUBSCRIBE", {
  214. name: `stations.nextSong?id=${station._id}`,
  215. cb: () =>
  216. StationsModule.runJob("SKIP_STATION", {
  217. stationId: station._id
  218. }),
  219. unique: true,
  220. station
  221. })
  222. .then()
  223. .catch();
  224. if (station.paused) return next(true, station);
  225. return next(null, station);
  226. });
  227. },
  228. (station, next) => {
  229. if (!station.currentSong) {
  230. return StationsModule.runJob(
  231. "SKIP_STATION",
  232. {
  233. stationId: station._id
  234. },
  235. this
  236. )
  237. .then(station => {
  238. next(true, station);
  239. })
  240. .catch(next)
  241. .finally(() => {});
  242. }
  243. let timeLeft =
  244. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  245. if (Number.isNaN(timeLeft)) timeLeft = -1;
  246. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  247. return StationsModule.runJob(
  248. "SKIP_STATION",
  249. {
  250. stationId: station._id
  251. },
  252. this
  253. )
  254. .then(station => {
  255. next(null, station);
  256. })
  257. .catch(next);
  258. }
  259. // name, time, cb, station
  260. NotificationsModule.runJob("SCHEDULE", {
  261. name: `stations.nextSong?id=${station._id}`,
  262. time: timeLeft,
  263. station
  264. });
  265. return next(null, station);
  266. }
  267. ],
  268. async (err, station) => {
  269. if (err && err !== true) {
  270. err = await UtilsModule.runJob(
  271. "GET_ERROR",
  272. {
  273. error: err
  274. },
  275. this
  276. );
  277. reject(new Error(err));
  278. } else resolve(station);
  279. }
  280. );
  281. });
  282. }
  283. /**
  284. * Calculates the next song for the station
  285. *
  286. * @param {object} payload - object that contains the payload
  287. * @param {string} payload.station - station object to calculate song for
  288. * @returns {Promise} - returns a promise (resolve, reject)
  289. */
  290. async CALCULATE_SONG_FOR_STATION(payload) {
  291. // station, bypassValidate = false
  292. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  293. return new Promise((resolve, reject) => {
  294. const songList = [];
  295. return async.waterfall(
  296. [
  297. next => {
  298. if (payload.station.genres.length === 0) return next();
  299. const genresDone = [];
  300. return payload.station.genres.forEach(genre => {
  301. songModel.find({ genres: genre }, (err, songs) => {
  302. if (!err) {
  303. songs.forEach(song => {
  304. if (songList.indexOf(song._id) === -1) {
  305. let found = false;
  306. song.genres.forEach(songGenre => {
  307. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  308. found = true;
  309. });
  310. if (!found) {
  311. songList.push(song._id);
  312. }
  313. }
  314. });
  315. }
  316. genresDone.push(genre);
  317. if (genresDone.length === payload.station.genres.length) next();
  318. });
  319. });
  320. },
  321. next => {
  322. const playlist = [];
  323. songList.forEach(songId => {
  324. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  325. });
  326. // eslint-disable-next-line array-callback-return
  327. payload.station.playlist.filter(songId => {
  328. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  329. });
  330. UtilsModule.runJob("SHUFFLE", { array: playlist })
  331. .then(result => {
  332. next(null, result.array);
  333. }, this)
  334. .catch(next);
  335. },
  336. (playlist, next) => {
  337. StationsModule.runJob(
  338. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  339. {
  340. stationId: payload.station._id,
  341. songList: playlist
  342. },
  343. this
  344. )
  345. .then(() => {
  346. next(null, playlist);
  347. })
  348. .catch(next);
  349. },
  350. (playlist, next) => {
  351. StationsModule.stationModel.updateOne(
  352. { _id: payload.station._id },
  353. { $set: { playlist } },
  354. { runValidators: true },
  355. () => {
  356. StationsModule.runJob(
  357. "UPDATE_STATION",
  358. {
  359. stationId: payload.station._id
  360. },
  361. this
  362. )
  363. .then(() => {
  364. next(null, playlist);
  365. })
  366. .catch(next);
  367. }
  368. );
  369. }
  370. ],
  371. (err, newPlaylist) => {
  372. if (err) return reject(new Error(err));
  373. return resolve(newPlaylist);
  374. }
  375. );
  376. });
  377. }
  378. /**
  379. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  380. *
  381. * @param {object} payload - object that contains the payload
  382. * @param {string} payload.stationId - id of the station
  383. * @returns {Promise} - returns a promise (resolve, reject)
  384. */
  385. GET_STATION(payload) {
  386. return new Promise((resolve, reject) => {
  387. async.waterfall(
  388. [
  389. next => {
  390. CacheModule.runJob(
  391. "HGET",
  392. {
  393. table: "stations",
  394. key: payload.stationId
  395. },
  396. this
  397. )
  398. .then(station => {
  399. next(null, station);
  400. })
  401. .catch(next);
  402. },
  403. (station, next) => {
  404. if (station) return next(true, station);
  405. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  406. },
  407. (station, next) => {
  408. if (station) {
  409. if (station.type === "official") {
  410. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  411. stationId: station._id,
  412. songList: station.playlist
  413. })
  414. .then()
  415. .catch();
  416. }
  417. station = StationsModule.stationSchema(station);
  418. CacheModule.runJob("HSET", {
  419. table: "stations",
  420. key: payload.stationId,
  421. value: station
  422. })
  423. .then()
  424. .catch();
  425. next(true, station);
  426. } else next("Station not found");
  427. }
  428. ],
  429. async (err, station) => {
  430. if (err && err !== true) {
  431. err = await UtilsModule.runJob(
  432. "GET_ERROR",
  433. {
  434. error: err
  435. },
  436. this
  437. );
  438. reject(new Error(err));
  439. } else resolve(station);
  440. }
  441. );
  442. });
  443. }
  444. /**
  445. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  446. *
  447. * @param {object} payload - object that contains the payload
  448. * @param {string} payload.stationName - the unique name of the station
  449. * @returns {Promise} - returns a promise (resolve, reject)
  450. */
  451. async GET_STATION_BY_NAME(payload) {
  452. return new Promise((resolve, reject) =>
  453. async.waterfall(
  454. [
  455. next => {
  456. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  457. },
  458. (station, next) => {
  459. if (station) {
  460. if (station.type === "official") {
  461. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  462. stationId: station._id,
  463. songList: station.playlist
  464. });
  465. }
  466. station = StationsModule.stationSchema(station);
  467. CacheModule.runJob("HSET", {
  468. table: "stations",
  469. key: station._id,
  470. value: station
  471. });
  472. next(true, station);
  473. } else next("Station not found");
  474. }
  475. ],
  476. (err, station) => {
  477. if (err && err !== true) return reject(new Error(err));
  478. return resolve(station);
  479. }
  480. )
  481. );
  482. }
  483. /**
  484. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  485. *
  486. * @param {object} payload - object that contains the payload
  487. * @param {string} payload.stationId - the id of the station to update
  488. * @returns {Promise} - returns a promise (resolve, reject)
  489. */
  490. UPDATE_STATION(payload) {
  491. return new Promise((resolve, reject) => {
  492. async.waterfall(
  493. [
  494. next => {
  495. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  496. },
  497. (station, next) => {
  498. if (!station) {
  499. CacheModule.runJob("HDEL", {
  500. table: "stations",
  501. key: payload.stationId
  502. })
  503. .then()
  504. .catch();
  505. return next("Station not found");
  506. }
  507. return CacheModule.runJob(
  508. "HSET",
  509. {
  510. table: "stations",
  511. key: payload.stationId,
  512. value: station
  513. },
  514. this
  515. )
  516. .then(station => {
  517. next(null, station);
  518. })
  519. .catch(next);
  520. }
  521. ],
  522. async (err, station) => {
  523. if (err && err !== true) {
  524. err = await UtilsModule.runJob(
  525. "GET_ERROR",
  526. {
  527. error: err
  528. },
  529. this
  530. );
  531. reject(new Error(err));
  532. } else resolve(station);
  533. }
  534. );
  535. });
  536. }
  537. /**
  538. * Creates the official playlist for a station
  539. *
  540. * @param {object} payload - object that contains the payload
  541. * @param {string} payload.stationId - the id of the station
  542. * @param {Array} payload.songList - list of songs to put in official playlist
  543. * @returns {Promise} - returns a promise (resolve, reject)
  544. */
  545. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  546. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  547. console.log(typeof payload.songList, payload.songList);
  548. return new Promise(resolve => {
  549. const lessInfoPlaylist = [];
  550. return async.each(
  551. payload.songList,
  552. (song, next) => {
  553. SongsModule.runJob("GET_SONG", { id: song }, this)
  554. .then(response => {
  555. const { song } = response;
  556. if (song) {
  557. const newSong = {
  558. songId: song.songId,
  559. title: song.title,
  560. artists: song.artists,
  561. duration: song.duration
  562. };
  563. lessInfoPlaylist.push(newSong);
  564. }
  565. })
  566. .finally(() => {
  567. next();
  568. });
  569. },
  570. () => {
  571. CacheModule.runJob(
  572. "HSET",
  573. {
  574. table: "officialPlaylists",
  575. key: payload.stationId,
  576. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  577. },
  578. this
  579. ).finally(() => {
  580. CacheModule.runJob("PUB", {
  581. channel: "station.newOfficialPlaylist",
  582. value: payload.stationId
  583. });
  584. resolve();
  585. });
  586. }
  587. );
  588. });
  589. }
  590. /**
  591. * Skips a station
  592. *
  593. * @param {object} payload - object that contains the payload
  594. * @param {string} payload.stationId - the id of the station to skip
  595. * @returns {Promise} - returns a promise (resolve, reject)
  596. */
  597. SKIP_STATION(payload) {
  598. return new Promise((resolve, reject) => {
  599. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  600. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  601. async.waterfall(
  602. [
  603. next => {
  604. NotificationsModule.runJob("UNSCHEDULE", {
  605. name: `stations.nextSong?id=${payload.stationId}`
  606. })
  607. .then(() => {
  608. next();
  609. })
  610. .catch(next);
  611. },
  612. next => {
  613. StationsModule.runJob(
  614. "GET_STATION",
  615. {
  616. stationId: payload.stationId
  617. },
  618. this
  619. )
  620. .then(station => {
  621. next(null, station);
  622. })
  623. .catch(() => {});
  624. },
  625. // eslint-disable-next-line consistent-return
  626. (station, next) => {
  627. if (!station) return next("Station not found.");
  628. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  629. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  630. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  631. // Community station with party mode enabled and songs in the queue
  632. if (station.paused) return next(null, null, -19, station);
  633. return StationsModule.stationModel.updateOne(
  634. { _id: payload.stationId },
  635. {
  636. $pull: {
  637. queue: {
  638. _id: station.queue[0]._id
  639. }
  640. }
  641. },
  642. err => {
  643. if (err) return next(err);
  644. return next(null, station.queue[0], -12, station);
  645. }
  646. );
  647. }
  648. if (station.type === "community" && !station.partyMode) {
  649. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  650. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  651. if (err) return next(err);
  652. if (!playlist) return next(null, null, -13, station);
  653. playlist = playlist.songs;
  654. if (playlist.length > 0) {
  655. let currentSongIndex;
  656. if (station.currentSongIndex < playlist.length - 1)
  657. currentSongIndex = station.currentSongIndex + 1;
  658. else currentSongIndex = 0;
  659. const callback = (err, song) => {
  660. if (err) return next(err);
  661. if (song) return next(null, song, currentSongIndex, station);
  662. const currentSong = {
  663. songId: playlist[currentSongIndex].songId,
  664. title: playlist[currentSongIndex].title,
  665. duration: playlist[currentSongIndex].duration,
  666. likes: -1,
  667. dislikes: -1
  668. };
  669. return next(null, currentSong, currentSongIndex, station);
  670. };
  671. if (playlist[currentSongIndex]._id)
  672. return SongsModule.runJob(
  673. "GET_SONG",
  674. {
  675. id: playlist[currentSongIndex]._id
  676. },
  677. this
  678. )
  679. .then(response => callback(null, response.song))
  680. .catch(callback);
  681. return SongsModule.runJob(
  682. "GET_SONG_FROM_ID",
  683. {
  684. songId: playlist[currentSongIndex].songId
  685. },
  686. this
  687. )
  688. .then(response => callback(null, response.song))
  689. .catch(callback);
  690. }
  691. return next(null, null, -14, station);
  692. })
  693. );
  694. }
  695. if (station.type === "official" && station.playlist.length === 0) {
  696. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  697. .then(playlist => {
  698. if (playlist.length === 0)
  699. return next(null, StationsModule.defaultSong, 0, station);
  700. return SongsModule.runJob(
  701. "GET_SONG",
  702. {
  703. id: playlist[0]
  704. },
  705. this
  706. )
  707. .then(response => {
  708. next(null, response.song, 0, station);
  709. })
  710. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  711. })
  712. .catch(err => {
  713. next(err);
  714. });
  715. }
  716. if (station.type === "official" && station.playlist.length > 0) {
  717. return async.doUntil(
  718. next => {
  719. if (station.currentSongIndex < station.playlist.length - 1) {
  720. SongsModule.runJob(
  721. "GET_SONG",
  722. {
  723. id: station.playlist[station.currentSongIndex + 1]
  724. },
  725. this
  726. )
  727. .then(response => next(null, response.song, station.currentSongIndex + 1))
  728. .catch(() => {
  729. station.currentSongIndex += 1;
  730. next(null, null, null);
  731. });
  732. } else {
  733. StationsModule.runJob(
  734. "CALCULATE_SONG_FOR_STATION",
  735. {
  736. station
  737. },
  738. this
  739. )
  740. .then(newPlaylist => {
  741. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  742. .then(response => {
  743. station.playlist = newPlaylist;
  744. next(null, response.song, 0);
  745. })
  746. .catch(() => next(null, StationsModule.defaultSong, 0));
  747. })
  748. .catch(() => {
  749. next(null, StationsModule.defaultSong, 0);
  750. });
  751. }
  752. },
  753. (song, currentSongIndex, next) => {
  754. if (song) return next(null, true, currentSongIndex);
  755. return next(null, false);
  756. },
  757. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  758. );
  759. }
  760. },
  761. (song, currentSongIndex, station, next) => {
  762. const $set = {};
  763. if (song === null) $set.currentSong = null;
  764. else if (song.likes === -1 && song.dislikes === -1) {
  765. $set.currentSong = {
  766. songId: song.songId,
  767. title: song.title,
  768. duration: song.duration,
  769. skipDuration: 0,
  770. likes: -1,
  771. dislikes: -1
  772. };
  773. } else {
  774. $set.currentSong = {
  775. songId: song.songId,
  776. title: song.title,
  777. artists: song.artists,
  778. duration: song.duration,
  779. likes: song.likes,
  780. dislikes: song.dislikes,
  781. skipDuration: song.skipDuration,
  782. thumbnail: song.thumbnail
  783. };
  784. }
  785. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  786. $set.startedAt = Date.now();
  787. $set.timePaused = 0;
  788. if (station.paused) $set.pausedAt = Date.now();
  789. next(null, $set, station);
  790. },
  791. ($set, station, next) => {
  792. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  793. StationsModule.runJob(
  794. "UPDATE_STATION",
  795. {
  796. stationId: station._id
  797. },
  798. this
  799. )
  800. .then(station => {
  801. if (station.type === "community" && station.partyMode === true)
  802. CacheModule.runJob("PUB", {
  803. channel: "station.queueUpdate",
  804. value: payload.stationId
  805. })
  806. .then()
  807. .catch();
  808. next(null, station);
  809. })
  810. .catch(next);
  811. });
  812. }
  813. ],
  814. async (err, station) => {
  815. if (err) {
  816. err = await UtilsModule.runJob(
  817. "GET_ERROR",
  818. {
  819. error: err
  820. },
  821. this
  822. );
  823. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  824. reject(new Error(err));
  825. } else {
  826. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  827. station.currentSong.skipVotes = 0;
  828. }
  829. // TODO Pub/Sub this
  830. IOModule.runJob("EMIT_TO_ROOM", {
  831. room: `station.${station._id}`,
  832. args: [
  833. "event:songs.next",
  834. {
  835. currentSong: station.currentSong,
  836. startedAt: station.startedAt,
  837. paused: station.paused,
  838. timePaused: 0
  839. }
  840. ]
  841. })
  842. .then()
  843. .catch();
  844. if (station.privacy === "public") {
  845. IOModule.runJob("EMIT_TO_ROOM", {
  846. room: "home",
  847. args: ["event:station.nextSong", station._id, station.currentSong]
  848. })
  849. .then()
  850. .catch();
  851. } else {
  852. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  853. Object.keys(sockets).forEach(socketKey => {
  854. const socket = sockets[socketKey];
  855. const { session } = socket;
  856. if (session.sessionId) {
  857. CacheModule.runJob(
  858. "HGET",
  859. {
  860. table: "sessions",
  861. key: session.sessionId
  862. },
  863. this
  864. // eslint-disable-next-line no-loop-func
  865. ).then(session => {
  866. if (session) {
  867. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  868. userModel => {
  869. userModel.findOne(
  870. {
  871. _id: session.userId
  872. },
  873. (err, user) => {
  874. if (!err && user) {
  875. if (user.role === "admin")
  876. socket.emit(
  877. "event:station.nextSong",
  878. station._id,
  879. station.currentSong
  880. );
  881. else if (
  882. station.type === "community" &&
  883. station.owner === session.userId
  884. )
  885. socket.emit(
  886. "event:station.nextSong",
  887. station._id,
  888. station.currentSong
  889. );
  890. }
  891. }
  892. );
  893. }
  894. );
  895. }
  896. });
  897. }
  898. });
  899. }
  900. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  901. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  902. sockets: await IOModule.runJob(
  903. "GET_ROOM_SOCKETS",
  904. {
  905. room: `station.${station._id}`
  906. },
  907. this
  908. ),
  909. room: `song.${station.currentSong.songId}`
  910. });
  911. if (!station.paused) {
  912. NotificationsModule.runJob("SCHEDULE", {
  913. name: `stations.nextSong?id=${station._id}`,
  914. time: station.currentSong.duration * 1000,
  915. station
  916. });
  917. }
  918. } else {
  919. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  920. sockets: await IOModule.runJob(
  921. "GET_ROOM_SOCKETS",
  922. {
  923. room: `station.${station._id}`
  924. },
  925. this
  926. )
  927. })
  928. .then()
  929. .catch();
  930. }
  931. resolve({ station });
  932. }
  933. }
  934. );
  935. });
  936. }
  937. /**
  938. * Checks if a user can view/access a station
  939. *
  940. * @param {object} payload - object that contains the payload
  941. * @param {object} payload.station - the station object of the station in question
  942. * @param {string} payload.userId - the id of the user in question
  943. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  944. * @returns {Promise} - returns a promise (resolve, reject)
  945. */
  946. CAN_USER_VIEW_STATION(payload) {
  947. return new Promise((resolve, reject) => {
  948. async.waterfall(
  949. [
  950. next => {
  951. if (payload.station.privacy === "public") return next(true);
  952. if (payload.station.privacy === "unlisted")
  953. if (payload.hideUnlisted === true) return next();
  954. else return next(true);
  955. if (!payload.userId) return next("Not allowed");
  956. return next();
  957. },
  958. next => {
  959. DBModule.runJob(
  960. "GET_MODEL",
  961. {
  962. modelName: "user"
  963. },
  964. this
  965. ).then(userModel => {
  966. userModel.findOne({ _id: payload.userId }, next);
  967. });
  968. },
  969. (user, next) => {
  970. if (!user) return next("Not allowed");
  971. if (user.role === "admin") return next(true);
  972. if (payload.station.type === "official") return next("Not allowed");
  973. if (payload.station.owner === payload.userId) return next(true);
  974. return next("Not allowed");
  975. }
  976. ],
  977. async errOrResult => {
  978. if (errOrResult !== true && errOrResult !== "Not allowed") {
  979. errOrResult = await UtilsModule.runJob(
  980. "GET_ERROR",
  981. {
  982. error: errOrResult
  983. },
  984. this
  985. );
  986. reject(new Error(errOrResult));
  987. } else {
  988. resolve(errOrResult === true);
  989. }
  990. }
  991. );
  992. });
  993. }
  994. }
  995. export default new _StationsModule();