stations.js 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1,
  35. requestedAt: Date.now()
  36. };
  37. this.userList = {};
  38. this.usersPerStation = {};
  39. this.usersPerStationCount = {};
  40. // TEMP
  41. CacheModule.runJob("SUB", {
  42. channel: "station.pause",
  43. cb: async stationId => {
  44. NotificationsModule.runJob("REMOVE", {
  45. subscription: `stations.nextSong?id=${stationId}`
  46. }).then();
  47. }
  48. });
  49. CacheModule.runJob("SUB", {
  50. channel: "station.resume",
  51. cb: async stationId => {
  52. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  53. }
  54. });
  55. CacheModule.runJob("SUB", {
  56. channel: "station.queueUpdate",
  57. cb: async stationId => {
  58. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  59. if (!station.currentSong && station.queue.length > 0) {
  60. StationsModule.runJob("INITIALIZE_STATION", {
  61. stationId
  62. }).then();
  63. }
  64. });
  65. }
  66. });
  67. CacheModule.runJob("SUB", {
  68. channel: "station.newOfficialPlaylist",
  69. cb: async stationId => {
  70. CacheModule.runJob("HGET", {
  71. table: "officialPlaylists",
  72. key: stationId
  73. }).then(playlistObj => {
  74. if (playlistObj) {
  75. IOModule.runJob("EMIT_TO_ROOM", {
  76. room: `station.${stationId}`,
  77. args: ["event:newOfficialPlaylist", playlistObj.songs]
  78. });
  79. }
  80. });
  81. }
  82. });
  83. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  84. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  85. return new Promise((resolve, reject) =>
  86. async.waterfall(
  87. [
  88. next => {
  89. this.setStage(2);
  90. CacheModule.runJob("HGETALL", { table: "stations" })
  91. .then(stations => {
  92. next(null, stations);
  93. })
  94. .catch(next);
  95. },
  96. (stations, next) => {
  97. this.setStage(3);
  98. if (!stations) return next();
  99. const stationIds = Object.keys(stations);
  100. return async.each(
  101. stationIds,
  102. (stationId, next) => {
  103. stationModel.findOne({ _id: stationId }, (err, station) => {
  104. if (err) next(err);
  105. else if (!station) {
  106. CacheModule.runJob("HDEL", {
  107. table: "stations",
  108. key: stationId
  109. })
  110. .then(() => {
  111. next();
  112. })
  113. .catch(next);
  114. } else next();
  115. });
  116. },
  117. next
  118. );
  119. },
  120. next => {
  121. this.setStage(4);
  122. stationModel.find({}, next);
  123. },
  124. (stations, next) => {
  125. this.setStage(5);
  126. async.each(
  127. stations,
  128. (station, next2) => {
  129. async.waterfall(
  130. [
  131. next => {
  132. CacheModule.runJob("HSET", {
  133. table: "stations",
  134. key: station._id,
  135. value: stationSchema(station)
  136. })
  137. .then(station => next(null, station))
  138. .catch(next);
  139. },
  140. (station, next) => {
  141. StationsModule.runJob(
  142. "INITIALIZE_STATION",
  143. {
  144. stationId: station._id
  145. },
  146. null,
  147. -1
  148. )
  149. .then(() => {
  150. next();
  151. })
  152. .catch(next);
  153. }
  154. ],
  155. err => {
  156. next2(err);
  157. }
  158. );
  159. },
  160. next
  161. );
  162. }
  163. ],
  164. async err => {
  165. if (err) {
  166. err = await UtilsModule.runJob("GET_ERROR", {
  167. error: err
  168. });
  169. reject(new Error(err));
  170. } else {
  171. resolve();
  172. }
  173. }
  174. )
  175. );
  176. }
  177. /**
  178. * Initialises a station
  179. *
  180. * @param {object} payload - object that contains the payload
  181. * @param {string} payload.stationId - id of the station to initialise
  182. * @returns {Promise} - returns a promise (resolve, reject)
  183. */
  184. INITIALIZE_STATION(payload) {
  185. return new Promise((resolve, reject) => {
  186. // if (typeof cb !== 'function') cb = ()=>{};
  187. async.waterfall(
  188. [
  189. next => {
  190. StationsModule.runJob(
  191. "GET_STATION",
  192. {
  193. stationId: payload.stationId
  194. },
  195. this
  196. )
  197. .then(station => {
  198. next(null, station);
  199. })
  200. .catch(next);
  201. },
  202. (station, next) => {
  203. if (!station) return next("Station not found.");
  204. return NotificationsModule.runJob(
  205. "UNSCHEDULE",
  206. {
  207. name: `stations.nextSong?id=${station._id}`
  208. },
  209. this
  210. )
  211. .then()
  212. .catch()
  213. .finally(() => {
  214. NotificationsModule.runJob("SUBSCRIBE", {
  215. name: `stations.nextSong?id=${station._id}`,
  216. cb: () =>
  217. StationsModule.runJob("SKIP_STATION", {
  218. stationId: station._id
  219. }),
  220. unique: true,
  221. station
  222. })
  223. .then()
  224. .catch();
  225. if (station.paused) return next(true, station);
  226. return next(null, station);
  227. });
  228. },
  229. (station, next) => {
  230. if (!station.currentSong) {
  231. return StationsModule.runJob(
  232. "SKIP_STATION",
  233. {
  234. stationId: station._id
  235. },
  236. this
  237. )
  238. .then(station => {
  239. next(true, station);
  240. })
  241. .catch(next)
  242. .finally(() => {});
  243. }
  244. let timeLeft =
  245. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  246. if (Number.isNaN(timeLeft)) timeLeft = -1;
  247. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  248. return StationsModule.runJob(
  249. "SKIP_STATION",
  250. {
  251. stationId: station._id
  252. },
  253. this
  254. )
  255. .then(station => {
  256. next(null, station);
  257. })
  258. .catch(next);
  259. }
  260. // name, time, cb, station
  261. NotificationsModule.runJob("SCHEDULE", {
  262. name: `stations.nextSong?id=${station._id}`,
  263. time: timeLeft,
  264. station
  265. });
  266. return next(null, station);
  267. }
  268. ],
  269. async (err, station) => {
  270. if (err && err !== true) {
  271. err = await UtilsModule.runJob(
  272. "GET_ERROR",
  273. {
  274. error: err
  275. },
  276. this
  277. );
  278. reject(new Error(err));
  279. } else resolve(station);
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Calculates the next song for the station
  286. *
  287. * @param {object} payload - object that contains the payload
  288. * @param {string} payload.station - station object to calculate song for
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. async CALCULATE_SONG_FOR_STATION(payload) {
  292. // station, bypassValidate = false
  293. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  294. return new Promise((resolve, reject) => {
  295. const songList = [];
  296. return async.waterfall(
  297. [
  298. next => {
  299. if (payload.station.genres.length === 0) return next();
  300. const genresDone = [];
  301. const blacklistedGenres = payload.station.blacklistedGenres.map(blacklistedGenre =>
  302. blacklistedGenre.toLowerCase()
  303. );
  304. return payload.station.genres.forEach(genre => {
  305. songModel.find({ genres: { $regex: genre, $options: "i" } }, (err, songs) => {
  306. if (!err) {
  307. songs.forEach(song => {
  308. if (songList.indexOf(song._id) === -1) {
  309. let found = false;
  310. song.genres.forEach(songGenre => {
  311. if (blacklistedGenres.indexOf(songGenre.toLowerCase()) !== -1)
  312. found = true;
  313. });
  314. if (!found) {
  315. songList.push(song._id);
  316. }
  317. }
  318. });
  319. }
  320. genresDone.push(genre);
  321. if (genresDone.length === payload.station.genres.length) next();
  322. });
  323. });
  324. },
  325. next => {
  326. const playlist = [];
  327. songList.forEach(songId => {
  328. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  329. });
  330. // eslint-disable-next-line array-callback-return
  331. payload.station.playlist.filter(songId => {
  332. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  333. });
  334. UtilsModule.runJob("SHUFFLE", { array: playlist })
  335. .then(result => {
  336. next(null, result.array);
  337. }, this)
  338. .catch(next);
  339. },
  340. (playlist, next) => {
  341. StationsModule.runJob(
  342. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  343. {
  344. stationId: payload.station._id,
  345. songList: playlist
  346. },
  347. this
  348. )
  349. .then(() => {
  350. next(null, playlist);
  351. })
  352. .catch(next);
  353. },
  354. (playlist, next) => {
  355. StationsModule.stationModel.updateOne(
  356. { _id: payload.station._id },
  357. { $set: { playlist } },
  358. { runValidators: true },
  359. () => {
  360. StationsModule.runJob(
  361. "UPDATE_STATION",
  362. {
  363. stationId: payload.station._id
  364. },
  365. this
  366. )
  367. .then(() => {
  368. next(null, playlist);
  369. })
  370. .catch(next);
  371. }
  372. );
  373. }
  374. ],
  375. (err, newPlaylist) => {
  376. if (err) return reject(new Error(err));
  377. return resolve(newPlaylist);
  378. }
  379. );
  380. });
  381. }
  382. /**
  383. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  384. *
  385. * @param {object} payload - object that contains the payload
  386. * @param {string} payload.stationId - id of the station
  387. * @returns {Promise} - returns a promise (resolve, reject)
  388. */
  389. GET_STATION(payload) {
  390. return new Promise((resolve, reject) => {
  391. async.waterfall(
  392. [
  393. next => {
  394. CacheModule.runJob(
  395. "HGET",
  396. {
  397. table: "stations",
  398. key: payload.stationId
  399. },
  400. this
  401. )
  402. .then(station => {
  403. next(null, station);
  404. })
  405. .catch(next);
  406. },
  407. (station, next) => {
  408. if (station) return next(true, station);
  409. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  410. },
  411. (station, next) => {
  412. if (station) {
  413. if (station.type === "official") {
  414. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  415. stationId: station._id,
  416. songList: station.playlist
  417. })
  418. .then()
  419. .catch();
  420. }
  421. station = StationsModule.stationSchema(station);
  422. CacheModule.runJob("HSET", {
  423. table: "stations",
  424. key: payload.stationId,
  425. value: station
  426. })
  427. .then()
  428. .catch();
  429. next(true, station);
  430. } else next("Station not found");
  431. }
  432. ],
  433. async (err, station) => {
  434. if (err && err !== true) {
  435. err = await UtilsModule.runJob(
  436. "GET_ERROR",
  437. {
  438. error: err
  439. },
  440. this
  441. );
  442. reject(new Error(err));
  443. } else resolve(station);
  444. }
  445. );
  446. });
  447. }
  448. /**
  449. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  450. *
  451. * @param {object} payload - object that contains the payload
  452. * @param {string} payload.stationName - the unique name of the station
  453. * @returns {Promise} - returns a promise (resolve, reject)
  454. */
  455. async GET_STATION_BY_NAME(payload) {
  456. return new Promise((resolve, reject) =>
  457. async.waterfall(
  458. [
  459. next => {
  460. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  461. },
  462. (station, next) => {
  463. if (station) {
  464. if (station.type === "official") {
  465. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  466. stationId: station._id,
  467. songList: station.playlist
  468. });
  469. }
  470. station = StationsModule.stationSchema(station);
  471. CacheModule.runJob("HSET", {
  472. table: "stations",
  473. key: station._id,
  474. value: station
  475. });
  476. next(true, station);
  477. } else next("Station not found");
  478. }
  479. ],
  480. (err, station) => {
  481. if (err && err !== true) return reject(new Error(err));
  482. return resolve(station);
  483. }
  484. )
  485. );
  486. }
  487. /**
  488. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  489. *
  490. * @param {object} payload - object that contains the payload
  491. * @param {string} payload.stationId - the id of the station to update
  492. * @returns {Promise} - returns a promise (resolve, reject)
  493. */
  494. UPDATE_STATION(payload) {
  495. return new Promise((resolve, reject) => {
  496. async.waterfall(
  497. [
  498. next => {
  499. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  500. },
  501. (station, next) => {
  502. if (!station) {
  503. CacheModule.runJob("HDEL", {
  504. table: "stations",
  505. key: payload.stationId
  506. })
  507. .then()
  508. .catch();
  509. return next("Station not found");
  510. }
  511. return CacheModule.runJob(
  512. "HSET",
  513. {
  514. table: "stations",
  515. key: payload.stationId,
  516. value: station
  517. },
  518. this
  519. )
  520. .then(station => {
  521. next(null, station);
  522. })
  523. .catch(next);
  524. }
  525. ],
  526. async (err, station) => {
  527. if (err && err !== true) {
  528. err = await UtilsModule.runJob(
  529. "GET_ERROR",
  530. {
  531. error: err
  532. },
  533. this
  534. );
  535. reject(new Error(err));
  536. } else resolve(station);
  537. }
  538. );
  539. });
  540. }
  541. /**
  542. * Creates the official playlist for a station
  543. *
  544. * @param {object} payload - object that contains the payload
  545. * @param {string} payload.stationId - the id of the station
  546. * @param {Array} payload.songList - list of songs to put in official playlist
  547. * @returns {Promise} - returns a promise (resolve, reject)
  548. */
  549. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  550. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  551. console.log(typeof payload.songList, payload.songList);
  552. return new Promise(resolve => {
  553. const lessInfoPlaylist = [];
  554. return async.each(
  555. payload.songList,
  556. (song, next) => {
  557. SongsModule.runJob("GET_SONG", { id: song }, this)
  558. .then(response => {
  559. const { song } = response;
  560. if (song) {
  561. const newSong = {
  562. songId: song.songId,
  563. title: song.title,
  564. artists: song.artists,
  565. duration: song.duration,
  566. thumbnail: song.thumbnail,
  567. requestedAt: song.requestedAt
  568. };
  569. lessInfoPlaylist.push(newSong);
  570. }
  571. })
  572. .finally(() => {
  573. next();
  574. });
  575. },
  576. () => {
  577. CacheModule.runJob(
  578. "HSET",
  579. {
  580. table: "officialPlaylists",
  581. key: payload.stationId,
  582. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  583. },
  584. this
  585. ).finally(() => {
  586. CacheModule.runJob("PUB", {
  587. channel: "station.newOfficialPlaylist",
  588. value: payload.stationId
  589. });
  590. resolve();
  591. });
  592. }
  593. );
  594. });
  595. }
  596. /**
  597. * Skips a station
  598. *
  599. * @param {object} payload - object that contains the payload
  600. * @param {string} payload.stationId - the id of the station to skip
  601. * @returns {Promise} - returns a promise (resolve, reject)
  602. */
  603. SKIP_STATION(payload) {
  604. return new Promise((resolve, reject) => {
  605. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  606. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  607. async.waterfall(
  608. [
  609. next => {
  610. NotificationsModule.runJob("UNSCHEDULE", {
  611. name: `stations.nextSong?id=${payload.stationId}`
  612. })
  613. .then(() => {
  614. next();
  615. })
  616. .catch(next);
  617. },
  618. next => {
  619. StationsModule.runJob(
  620. "GET_STATION",
  621. {
  622. stationId: payload.stationId
  623. },
  624. this
  625. )
  626. .then(station => {
  627. next(null, station);
  628. })
  629. .catch(() => {});
  630. },
  631. // eslint-disable-next-line consistent-return
  632. (station, next) => {
  633. if (!station) return next("Station not found.");
  634. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  635. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  636. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  637. // Community station with party mode enabled and songs in the queue
  638. if (station.paused) return next(null, null, -19, station);
  639. return StationsModule.stationModel.updateOne(
  640. { _id: payload.stationId },
  641. {
  642. $pull: {
  643. queue: {
  644. _id: station.queue[0]._id
  645. }
  646. }
  647. },
  648. err => {
  649. if (err) return next(err);
  650. return next(null, station.queue[0], -12, station);
  651. }
  652. );
  653. }
  654. if (station.type === "community" && !station.partyMode) {
  655. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  656. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  657. if (err) return next(err);
  658. if (!playlist) return next(null, null, -13, station);
  659. playlist = playlist.songs;
  660. if (playlist.length > 0) {
  661. let currentSongIndex;
  662. if (station.currentSongIndex < playlist.length - 1)
  663. currentSongIndex = station.currentSongIndex + 1;
  664. else currentSongIndex = 0;
  665. const callback = (err, song) => {
  666. if (err) return next(err);
  667. if (song) return next(null, song, currentSongIndex, station);
  668. const currentSong = {
  669. songId: playlist[currentSongIndex].songId,
  670. title: playlist[currentSongIndex].title,
  671. duration: playlist[currentSongIndex].duration,
  672. likes: -1,
  673. dislikes: -1,
  674. requestedAt: playlist[currentSongIndex].requestedAt
  675. };
  676. return next(null, currentSong, currentSongIndex, station);
  677. };
  678. if (playlist[currentSongIndex]._id)
  679. return SongsModule.runJob(
  680. "GET_SONG",
  681. {
  682. id: playlist[currentSongIndex]._id
  683. },
  684. this
  685. )
  686. .then(response => callback(null, response.song))
  687. .catch(callback);
  688. return SongsModule.runJob(
  689. "GET_SONG_FROM_ID",
  690. {
  691. songId: playlist[currentSongIndex].songId
  692. },
  693. this
  694. )
  695. .then(response => callback(null, response.song))
  696. .catch(callback);
  697. }
  698. return next(null, null, -14, station);
  699. })
  700. );
  701. }
  702. if (station.type === "official" && station.playlist.length === 0) {
  703. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  704. .then(playlist => {
  705. if (playlist.length === 0)
  706. return next(null, StationsModule.defaultSong, 0, station);
  707. return SongsModule.runJob(
  708. "GET_SONG",
  709. {
  710. id: playlist[0]
  711. },
  712. this
  713. )
  714. .then(response => {
  715. next(null, response.song, 0, station);
  716. })
  717. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  718. })
  719. .catch(err => {
  720. next(err);
  721. });
  722. }
  723. if (station.type === "official" && station.playlist.length > 0) {
  724. return async.doUntil(
  725. next => {
  726. if (station.currentSongIndex < station.playlist.length - 1) {
  727. SongsModule.runJob(
  728. "GET_SONG",
  729. {
  730. id: station.playlist[station.currentSongIndex + 1]
  731. },
  732. this
  733. )
  734. .then(response => next(null, response.song, station.currentSongIndex + 1))
  735. .catch(() => {
  736. station.currentSongIndex += 1;
  737. next(null, null, null);
  738. });
  739. } else {
  740. StationsModule.runJob(
  741. "CALCULATE_SONG_FOR_STATION",
  742. {
  743. station
  744. },
  745. this
  746. )
  747. .then(newPlaylist => {
  748. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  749. .then(response => {
  750. station.playlist = newPlaylist;
  751. next(null, response.song, 0);
  752. })
  753. .catch(() => next(null, StationsModule.defaultSong, 0));
  754. })
  755. .catch(() => {
  756. next(null, StationsModule.defaultSong, 0);
  757. });
  758. }
  759. },
  760. (song, currentSongIndex, next) => {
  761. if (song) return next(null, true, currentSongIndex);
  762. return next(null, false);
  763. },
  764. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  765. );
  766. }
  767. },
  768. (song, currentSongIndex, station, next) => {
  769. const $set = {};
  770. if (song === null) $set.currentSong = null;
  771. else if (song.likes === -1 && song.dislikes === -1) {
  772. $set.currentSong = {
  773. songId: song.songId,
  774. title: song.title,
  775. duration: song.duration,
  776. skipDuration: 0,
  777. likes: -1,
  778. dislikes: -1,
  779. requestedAt: song.requestedAt
  780. };
  781. } else {
  782. $set.currentSong = {
  783. songId: song.songId,
  784. title: song.title,
  785. artists: song.artists,
  786. duration: song.duration,
  787. likes: song.likes,
  788. dislikes: song.dislikes,
  789. skipDuration: song.skipDuration,
  790. thumbnail: song.thumbnail,
  791. requestedAt: song.requestedAt
  792. };
  793. }
  794. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  795. $set.startedAt = Date.now();
  796. $set.timePaused = 0;
  797. if (station.paused) $set.pausedAt = Date.now();
  798. next(null, $set, station);
  799. },
  800. ($set, station, next) => {
  801. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  802. StationsModule.runJob(
  803. "UPDATE_STATION",
  804. {
  805. stationId: station._id
  806. },
  807. this
  808. )
  809. .then(station => {
  810. if (station.type === "community" && station.partyMode === true)
  811. CacheModule.runJob("PUB", {
  812. channel: "station.queueUpdate",
  813. value: payload.stationId
  814. })
  815. .then()
  816. .catch();
  817. next(null, station);
  818. })
  819. .catch(next);
  820. });
  821. }
  822. ],
  823. async (err, station) => {
  824. if (err) {
  825. err = await UtilsModule.runJob(
  826. "GET_ERROR",
  827. {
  828. error: err
  829. },
  830. this
  831. );
  832. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  833. reject(new Error(err));
  834. } else {
  835. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  836. station.currentSong.skipVotes = 0;
  837. }
  838. // TODO Pub/Sub this
  839. IOModule.runJob("EMIT_TO_ROOM", {
  840. room: `station.${station._id}`,
  841. args: [
  842. "event:songs.next",
  843. {
  844. currentSong: station.currentSong,
  845. startedAt: station.startedAt,
  846. paused: station.paused,
  847. timePaused: 0
  848. }
  849. ]
  850. })
  851. .then()
  852. .catch();
  853. if (station.privacy === "public") {
  854. IOModule.runJob("EMIT_TO_ROOM", {
  855. room: "home",
  856. args: ["event:station.nextSong", station._id, station.currentSong]
  857. })
  858. .then()
  859. .catch();
  860. } else {
  861. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  862. Object.keys(sockets).forEach(socketKey => {
  863. const socket = sockets[socketKey];
  864. const { session } = socket;
  865. if (session.sessionId) {
  866. CacheModule.runJob(
  867. "HGET",
  868. {
  869. table: "sessions",
  870. key: session.sessionId
  871. },
  872. this
  873. // eslint-disable-next-line no-loop-func
  874. ).then(session => {
  875. if (session) {
  876. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  877. userModel => {
  878. userModel.findOne(
  879. {
  880. _id: session.userId
  881. },
  882. (err, user) => {
  883. if (!err && user) {
  884. if (user.role === "admin")
  885. socket.emit(
  886. "event:station.nextSong",
  887. station._id,
  888. station.currentSong
  889. );
  890. else if (
  891. station.type === "community" &&
  892. station.owner === session.userId
  893. )
  894. socket.emit(
  895. "event:station.nextSong",
  896. station._id,
  897. station.currentSong
  898. );
  899. }
  900. }
  901. );
  902. }
  903. );
  904. }
  905. });
  906. }
  907. });
  908. }
  909. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  910. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  911. sockets: await IOModule.runJob(
  912. "GET_ROOM_SOCKETS",
  913. {
  914. room: `station.${station._id}`
  915. },
  916. this
  917. ),
  918. room: `song.${station.currentSong.songId}`
  919. });
  920. if (!station.paused) {
  921. NotificationsModule.runJob("SCHEDULE", {
  922. name: `stations.nextSong?id=${station._id}`,
  923. time: station.currentSong.duration * 1000,
  924. station
  925. });
  926. }
  927. } else {
  928. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  929. sockets: await IOModule.runJob(
  930. "GET_ROOM_SOCKETS",
  931. {
  932. room: `station.${station._id}`
  933. },
  934. this
  935. )
  936. })
  937. .then()
  938. .catch();
  939. }
  940. resolve({ station });
  941. }
  942. }
  943. );
  944. });
  945. }
  946. /**
  947. * Checks if a user can view/access a station
  948. *
  949. * @param {object} payload - object that contains the payload
  950. * @param {object} payload.station - the station object of the station in question
  951. * @param {string} payload.userId - the id of the user in question
  952. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  953. * @returns {Promise} - returns a promise (resolve, reject)
  954. */
  955. CAN_USER_VIEW_STATION(payload) {
  956. return new Promise((resolve, reject) => {
  957. async.waterfall(
  958. [
  959. next => {
  960. if (payload.station.privacy === "public") return next(true);
  961. if (payload.station.privacy === "unlisted")
  962. if (payload.hideUnlisted === true) return next();
  963. else return next(true);
  964. if (!payload.userId) return next("Not allowed");
  965. return next();
  966. },
  967. next => {
  968. DBModule.runJob(
  969. "GET_MODEL",
  970. {
  971. modelName: "user"
  972. },
  973. this
  974. ).then(userModel => {
  975. userModel.findOne({ _id: payload.userId }, next);
  976. });
  977. },
  978. (user, next) => {
  979. if (!user) return next("Not allowed");
  980. if (user.role === "admin") return next(true);
  981. if (payload.station.type === "official") return next("Not allowed");
  982. if (payload.station.owner === payload.userId) return next(true);
  983. return next("Not allowed");
  984. }
  985. ],
  986. async errOrResult => {
  987. if (errOrResult !== true && errOrResult !== "Not allowed") {
  988. errOrResult = await UtilsModule.runJob(
  989. "GET_ERROR",
  990. {
  991. error: errOrResult
  992. },
  993. this
  994. );
  995. reject(new Error(errOrResult));
  996. } else {
  997. resolve(errOrResult === true);
  998. }
  999. }
  1000. );
  1001. });
  1002. }
  1003. }
  1004. export default new _StationsModule();