stations.js 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let WSModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. WSModule = this.moduleManager.modules.ws;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1,
  35. requestedAt: Date.now()
  36. };
  37. this.userList = {};
  38. this.usersPerStation = {};
  39. this.usersPerStationCount = {};
  40. // TEMP
  41. CacheModule.runJob("SUB", {
  42. channel: "station.pause",
  43. cb: async stationId => {
  44. NotificationsModule.runJob("REMOVE", {
  45. subscription: `stations.nextSong?id=${stationId}`
  46. }).then();
  47. }
  48. });
  49. CacheModule.runJob("SUB", {
  50. channel: "station.resume",
  51. cb: async stationId => {
  52. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  53. }
  54. });
  55. CacheModule.runJob("SUB", {
  56. channel: "station.queueUpdate",
  57. cb: async stationId => {
  58. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  59. if (!station.currentSong && station.queue.length > 0) {
  60. StationsModule.runJob("INITIALIZE_STATION", {
  61. stationId
  62. }).then();
  63. }
  64. });
  65. }
  66. });
  67. CacheModule.runJob("SUB", {
  68. channel: "station.newOfficialPlaylist",
  69. cb: async stationId => {
  70. CacheModule.runJob("HGET", {
  71. table: "officialPlaylists",
  72. key: stationId
  73. }).then(playlistObj => {
  74. if (playlistObj) {
  75. WSModule.runJob("EMIT_TO_ROOM", {
  76. room: `station.${stationId}`,
  77. args: ["event:newOfficialPlaylist", playlistObj.songs]
  78. });
  79. }
  80. });
  81. }
  82. });
  83. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  84. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  85. return new Promise((resolve, reject) =>
  86. async.waterfall(
  87. [
  88. next => {
  89. this.setStage(2);
  90. CacheModule.runJob("HGETALL", { table: "stations" })
  91. .then(stations => {
  92. next(null, stations);
  93. })
  94. .catch(next);
  95. },
  96. (stations, next) => {
  97. this.setStage(3);
  98. if (!stations) return next();
  99. const stationIds = Object.keys(stations);
  100. return async.each(
  101. stationIds,
  102. (stationId, next) => {
  103. stationModel.findOne({ _id: stationId }, (err, station) => {
  104. if (err) next(err);
  105. else if (!station) {
  106. CacheModule.runJob("HDEL", {
  107. table: "stations",
  108. key: stationId
  109. })
  110. .then(() => {
  111. next();
  112. })
  113. .catch(next);
  114. } else next();
  115. });
  116. },
  117. next
  118. );
  119. },
  120. next => {
  121. this.setStage(4);
  122. stationModel.find({}, next);
  123. },
  124. (stations, next) => {
  125. this.setStage(5);
  126. async.each(
  127. stations,
  128. (station, next2) => {
  129. async.waterfall(
  130. [
  131. next => {
  132. CacheModule.runJob("HSET", {
  133. table: "stations",
  134. key: station._id,
  135. value: stationSchema(station)
  136. })
  137. .then(station => next(null, station))
  138. .catch(next);
  139. },
  140. (station, next) => {
  141. StationsModule.runJob(
  142. "INITIALIZE_STATION",
  143. {
  144. stationId: station._id
  145. },
  146. null,
  147. -1
  148. )
  149. .then(() => {
  150. next();
  151. })
  152. .catch(next);
  153. }
  154. ],
  155. err => {
  156. next2(err);
  157. }
  158. );
  159. },
  160. next
  161. );
  162. }
  163. ],
  164. async err => {
  165. if (err) {
  166. err = await UtilsModule.runJob("GET_ERROR", {
  167. error: err
  168. });
  169. reject(new Error(err));
  170. } else {
  171. resolve();
  172. }
  173. }
  174. )
  175. );
  176. }
  177. /**
  178. * Initialises a station
  179. *
  180. * @param {object} payload - object that contains the payload
  181. * @param {string} payload.stationId - id of the station to initialise
  182. * @returns {Promise} - returns a promise (resolve, reject)
  183. */
  184. INITIALIZE_STATION(payload) {
  185. return new Promise((resolve, reject) => {
  186. // if (typeof cb !== 'function') cb = ()=>{};
  187. async.waterfall(
  188. [
  189. next => {
  190. StationsModule.runJob(
  191. "GET_STATION",
  192. {
  193. stationId: payload.stationId
  194. },
  195. this
  196. )
  197. .then(station => {
  198. next(null, station);
  199. })
  200. .catch(next);
  201. },
  202. (station, next) => {
  203. if (!station) return next("Station not found.");
  204. return NotificationsModule.runJob(
  205. "UNSCHEDULE",
  206. {
  207. name: `stations.nextSong?id=${station._id}`
  208. },
  209. this
  210. )
  211. .then()
  212. .catch()
  213. .finally(() => {
  214. NotificationsModule.runJob("SUBSCRIBE", {
  215. name: `stations.nextSong?id=${station._id}`,
  216. cb: () =>
  217. StationsModule.runJob("SKIP_STATION", {
  218. stationId: station._id
  219. }),
  220. unique: true,
  221. station
  222. })
  223. .then()
  224. .catch();
  225. if (station.paused) return next(true, station);
  226. return next(null, station);
  227. });
  228. },
  229. (station, next) => {
  230. if (!station.currentSong) {
  231. return StationsModule.runJob(
  232. "SKIP_STATION",
  233. {
  234. stationId: station._id
  235. },
  236. this
  237. )
  238. .then(station => {
  239. next(true, station);
  240. })
  241. .catch(next)
  242. .finally(() => {});
  243. }
  244. let timeLeft =
  245. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  246. if (Number.isNaN(timeLeft)) timeLeft = -1;
  247. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  248. return StationsModule.runJob(
  249. "SKIP_STATION",
  250. {
  251. stationId: station._id
  252. },
  253. this
  254. )
  255. .then(station => {
  256. next(null, station);
  257. })
  258. .catch(next);
  259. }
  260. // name, time, cb, station
  261. NotificationsModule.runJob("SCHEDULE", {
  262. name: `stations.nextSong?id=${station._id}`,
  263. time: timeLeft,
  264. station
  265. });
  266. return next(null, station);
  267. }
  268. ],
  269. async (err, station) => {
  270. if (err && err !== true) {
  271. err = await UtilsModule.runJob(
  272. "GET_ERROR",
  273. {
  274. error: err
  275. },
  276. this
  277. );
  278. reject(new Error(err));
  279. } else resolve(station);
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Calculates the next song for the station
  286. *
  287. * @param {object} payload - object that contains the payload
  288. * @param {string} payload.station - station object to calculate song for
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. async CALCULATE_SONG_FOR_STATION(payload) {
  292. // station, bypassValidate = false
  293. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  294. return new Promise((resolve, reject) => {
  295. const songList = [];
  296. return async.waterfall(
  297. [
  298. next => {
  299. if (payload.station.genres.length === 0) return next();
  300. const genresDone = [];
  301. const blacklistedGenres = payload.station.blacklistedGenres.map(blacklistedGenre =>
  302. blacklistedGenre.toLowerCase()
  303. );
  304. return payload.station.genres.forEach(genre => {
  305. songModel.find({ genres: { $regex: genre, $options: "i" } }, (err, songs) => {
  306. if (!err) {
  307. songs.forEach(song => {
  308. if (songList.indexOf(song._id) === -1) {
  309. let found = false;
  310. song.genres.forEach(songGenre => {
  311. if (blacklistedGenres.indexOf(songGenre.toLowerCase()) !== -1)
  312. found = true;
  313. });
  314. if (!found) {
  315. songList.push(song._id);
  316. }
  317. }
  318. });
  319. }
  320. genresDone.push(genre);
  321. if (genresDone.length === payload.station.genres.length) next();
  322. });
  323. });
  324. },
  325. next => {
  326. const playlist = [];
  327. songList.forEach(songId => {
  328. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  329. });
  330. // eslint-disable-next-line array-callback-return
  331. payload.station.playlist.filter(songId => {
  332. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  333. });
  334. UtilsModule.runJob("SHUFFLE", { array: playlist })
  335. .then(result => {
  336. next(null, result.array);
  337. }, this)
  338. .catch(next);
  339. },
  340. (playlist, next) => {
  341. StationsModule.runJob(
  342. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  343. {
  344. stationId: payload.station._id,
  345. songList: playlist
  346. },
  347. this
  348. )
  349. .then(() => {
  350. next(null, playlist);
  351. })
  352. .catch(next);
  353. },
  354. (playlist, next) => {
  355. StationsModule.stationModel.updateOne(
  356. { _id: payload.station._id },
  357. { $set: { playlist } },
  358. { runValidators: true },
  359. () => {
  360. StationsModule.runJob(
  361. "UPDATE_STATION",
  362. {
  363. stationId: payload.station._id
  364. },
  365. this
  366. )
  367. .then(() => {
  368. next(null, playlist);
  369. })
  370. .catch(next);
  371. }
  372. );
  373. }
  374. ],
  375. (err, newPlaylist) => {
  376. if (err) return reject(new Error(err));
  377. return resolve(newPlaylist);
  378. }
  379. );
  380. });
  381. }
  382. /**
  383. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  384. *
  385. * @param {object} payload - object that contains the payload
  386. * @param {string} payload.stationId - id of the station
  387. * @returns {Promise} - returns a promise (resolve, reject)
  388. */
  389. GET_STATION(payload) {
  390. return new Promise((resolve, reject) => {
  391. async.waterfall(
  392. [
  393. next => {
  394. CacheModule.runJob("HGET", { table: "stations", key: payload.stationId }, this)
  395. .then(station => next(null, station))
  396. .catch(next);
  397. },
  398. (station, next) => {
  399. if (station) return next(true, station);
  400. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  401. },
  402. (station, next) => {
  403. if (station) {
  404. if (station.type === "official") {
  405. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  406. stationId: station._id,
  407. songList: station.playlist
  408. })
  409. .then()
  410. .catch();
  411. }
  412. station = StationsModule.stationSchema(station);
  413. CacheModule.runJob("HSET", {
  414. table: "stations",
  415. key: payload.stationId,
  416. value: station
  417. })
  418. .then()
  419. .catch();
  420. next(true, station);
  421. } else next("Station not found");
  422. }
  423. ],
  424. async (err, station) => {
  425. if (err && err !== true) {
  426. err = await UtilsModule.runJob(
  427. "GET_ERROR",
  428. {
  429. error: err
  430. },
  431. this
  432. );
  433. reject(new Error(err));
  434. } else resolve(station);
  435. }
  436. );
  437. });
  438. }
  439. /**
  440. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  441. *
  442. * @param {object} payload - object that contains the payload
  443. * @param {string} payload.stationName - the unique name of the station
  444. * @returns {Promise} - returns a promise (resolve, reject)
  445. */
  446. async GET_STATION_BY_NAME(payload) {
  447. return new Promise((resolve, reject) =>
  448. async.waterfall(
  449. [
  450. next => {
  451. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  452. },
  453. (station, next) => {
  454. if (station) {
  455. if (station.type === "official") {
  456. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  457. stationId: station._id,
  458. songList: station.playlist
  459. });
  460. }
  461. station = StationsModule.stationSchema(station);
  462. CacheModule.runJob("HSET", {
  463. table: "stations",
  464. key: station._id,
  465. value: station
  466. });
  467. next(true, station);
  468. } else next("Station not found");
  469. }
  470. ],
  471. (err, station) => {
  472. if (err && err !== true) return reject(new Error(err));
  473. return resolve(station);
  474. }
  475. )
  476. );
  477. }
  478. /**
  479. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  480. *
  481. * @param {object} payload - object that contains the payload
  482. * @param {string} payload.stationId - the id of the station to update
  483. * @returns {Promise} - returns a promise (resolve, reject)
  484. */
  485. UPDATE_STATION(payload) {
  486. return new Promise((resolve, reject) => {
  487. async.waterfall(
  488. [
  489. next => {
  490. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  491. },
  492. (station, next) => {
  493. if (!station) {
  494. CacheModule.runJob("HDEL", {
  495. table: "stations",
  496. key: payload.stationId
  497. })
  498. .then()
  499. .catch();
  500. return next("Station not found");
  501. }
  502. return CacheModule.runJob(
  503. "HSET",
  504. {
  505. table: "stations",
  506. key: payload.stationId,
  507. value: station
  508. },
  509. this
  510. )
  511. .then(station => {
  512. next(null, station);
  513. })
  514. .catch(next);
  515. }
  516. ],
  517. async (err, station) => {
  518. if (err && err !== true) {
  519. err = await UtilsModule.runJob(
  520. "GET_ERROR",
  521. {
  522. error: err
  523. },
  524. this
  525. );
  526. reject(new Error(err));
  527. } else resolve(station);
  528. }
  529. );
  530. });
  531. }
  532. /**
  533. * Creates the official playlist for a station
  534. *
  535. * @param {object} payload - object that contains the payload
  536. * @param {string} payload.stationId - the id of the station
  537. * @param {Array} payload.songList - list of songs to put in official playlist
  538. * @returns {Promise} - returns a promise (resolve, reject)
  539. */
  540. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  541. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  542. console.log(typeof payload.songList, payload.songList);
  543. return new Promise(resolve => {
  544. const lessInfoPlaylist = [];
  545. return async.each(
  546. payload.songList,
  547. (song, next) => {
  548. SongsModule.runJob("GET_SONG", { id: song }, this)
  549. .then(response => {
  550. const { song } = response;
  551. if (song) {
  552. const newSong = {
  553. _id: song._id,
  554. songId: song.songId,
  555. title: song.title,
  556. artists: song.artists,
  557. duration: song.duration,
  558. thumbnail: song.thumbnail,
  559. requestedAt: song.requestedAt
  560. };
  561. lessInfoPlaylist.push(newSong);
  562. }
  563. })
  564. .finally(() => {
  565. next();
  566. });
  567. },
  568. () => {
  569. CacheModule.runJob(
  570. "HSET",
  571. {
  572. table: "officialPlaylists",
  573. key: payload.stationId,
  574. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  575. },
  576. this
  577. ).finally(() => {
  578. CacheModule.runJob("PUB", {
  579. channel: "station.newOfficialPlaylist",
  580. value: payload.stationId
  581. });
  582. resolve();
  583. });
  584. }
  585. );
  586. });
  587. }
  588. /**
  589. * Skips a station
  590. *
  591. * @param {object} payload - object that contains the payload
  592. * @param {string} payload.stationId - the id of the station to skip
  593. * @returns {Promise} - returns a promise (resolve, reject)
  594. */
  595. SKIP_STATION(payload) {
  596. return new Promise((resolve, reject) => {
  597. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  598. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  599. async.waterfall(
  600. [
  601. next => {
  602. NotificationsModule.runJob("UNSCHEDULE", {
  603. name: `stations.nextSong?id=${payload.stationId}`
  604. })
  605. .then(() => {
  606. next();
  607. })
  608. .catch(next);
  609. },
  610. next => {
  611. StationsModule.runJob(
  612. "GET_STATION",
  613. {
  614. stationId: payload.stationId
  615. },
  616. this
  617. )
  618. .then(station => {
  619. next(null, station);
  620. })
  621. .catch(() => {});
  622. },
  623. // eslint-disable-next-line consistent-return
  624. (station, next) => {
  625. if (!station) return next("Station not found.");
  626. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  627. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  628. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  629. // Community station with party mode enabled and songs in the queue
  630. if (station.paused) return next(null, null, -19, station);
  631. return StationsModule.stationModel.updateOne(
  632. { _id: payload.stationId },
  633. {
  634. $pull: {
  635. queue: {
  636. _id: station.queue[0]._id
  637. }
  638. }
  639. },
  640. err => {
  641. if (err) return next(err);
  642. return next(null, station.queue[0], -12, station);
  643. }
  644. );
  645. }
  646. if (station.type === "community" && !station.partyMode) {
  647. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  648. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  649. if (err) return next(err);
  650. if (!playlist) return next(null, null, -13, station);
  651. playlist = playlist.songs;
  652. if (playlist.length > 0) {
  653. let currentSongIndex;
  654. if (station.currentSongIndex < playlist.length - 1)
  655. currentSongIndex = station.currentSongIndex + 1;
  656. else currentSongIndex = 0;
  657. const callback = (err, song) => {
  658. if (err) return next(err);
  659. if (song) return next(null, song, currentSongIndex, station);
  660. const currentSong = {
  661. songId: playlist[currentSongIndex].songId,
  662. title: playlist[currentSongIndex].title,
  663. duration: playlist[currentSongIndex].duration,
  664. likes: -1,
  665. dislikes: -1,
  666. requestedAt: playlist[currentSongIndex].requestedAt
  667. };
  668. return next(null, currentSong, currentSongIndex, station);
  669. };
  670. if (playlist[currentSongIndex]._id)
  671. return SongsModule.runJob(
  672. "GET_SONG",
  673. {
  674. id: playlist[currentSongIndex]._id
  675. },
  676. this
  677. )
  678. .then(response => callback(null, response.song))
  679. .catch(callback);
  680. return SongsModule.runJob(
  681. "GET_SONG_FROM_ID",
  682. {
  683. songId: playlist[currentSongIndex].songId
  684. },
  685. this
  686. )
  687. .then(response => callback(null, response.song))
  688. .catch(callback);
  689. }
  690. return next(null, null, -14, station);
  691. })
  692. );
  693. }
  694. if (station.type === "official" && station.playlist.length === 0) {
  695. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  696. .then(playlist => {
  697. if (playlist.length === 0)
  698. return next(null, StationsModule.defaultSong, 0, station);
  699. return SongsModule.runJob(
  700. "GET_SONG",
  701. {
  702. id: playlist[0]
  703. },
  704. this
  705. )
  706. .then(response => {
  707. next(null, response.song, 0, station);
  708. })
  709. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  710. })
  711. .catch(err => {
  712. next(err);
  713. });
  714. }
  715. if (station.type === "official" && station.playlist.length > 0) {
  716. return async.doUntil(
  717. next => {
  718. if (station.currentSongIndex < station.playlist.length - 1) {
  719. SongsModule.runJob(
  720. "GET_SONG",
  721. {
  722. id: station.playlist[station.currentSongIndex + 1]
  723. },
  724. this
  725. )
  726. .then(response => next(null, response.song, station.currentSongIndex + 1))
  727. .catch(() => {
  728. station.currentSongIndex += 1;
  729. next(null, null, null);
  730. });
  731. } else {
  732. StationsModule.runJob(
  733. "CALCULATE_SONG_FOR_STATION",
  734. {
  735. station
  736. },
  737. this
  738. )
  739. .then(newPlaylist => {
  740. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  741. .then(response => {
  742. station.playlist = newPlaylist;
  743. next(null, response.song, 0);
  744. })
  745. .catch(() => next(null, StationsModule.defaultSong, 0));
  746. })
  747. .catch(() => {
  748. next(null, StationsModule.defaultSong, 0);
  749. });
  750. }
  751. },
  752. (song, currentSongIndex, next) => {
  753. if (song) return next(null, true, currentSongIndex);
  754. return next(null, false);
  755. },
  756. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  757. );
  758. }
  759. },
  760. (song, currentSongIndex, station, next) => {
  761. const $set = {};
  762. if (song === null) $set.currentSong = null;
  763. else if (song.likes === -1 && song.dislikes === -1) {
  764. $set.currentSong = {
  765. songId: song.songId,
  766. title: song.title,
  767. duration: song.duration,
  768. skipDuration: 0,
  769. likes: -1,
  770. dislikes: -1,
  771. requestedAt: song.requestedAt
  772. };
  773. } else {
  774. $set.currentSong = {
  775. _id: song._id,
  776. songId: song.songId,
  777. title: song.title,
  778. artists: song.artists,
  779. duration: song.duration,
  780. likes: song.likes,
  781. dislikes: song.dislikes,
  782. skipDuration: song.skipDuration,
  783. thumbnail: song.thumbnail,
  784. requestedAt: song.requestedAt
  785. };
  786. }
  787. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  788. $set.startedAt = Date.now();
  789. $set.timePaused = 0;
  790. if (station.paused) $set.pausedAt = Date.now();
  791. next(null, $set, station);
  792. },
  793. ($set, station, next) => {
  794. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, err => {
  795. if (err) return next(err);
  796. return StationsModule.runJob(
  797. "UPDATE_STATION",
  798. {
  799. stationId: station._id
  800. },
  801. this
  802. )
  803. .then(station => {
  804. if (station.type === "community" && station.partyMode === true)
  805. CacheModule.runJob("PUB", {
  806. channel: "station.queueUpdate",
  807. value: payload.stationId
  808. })
  809. .then()
  810. .catch();
  811. next(null, station);
  812. })
  813. .catch(next);
  814. });
  815. }
  816. ],
  817. async (err, station) => {
  818. if (err) {
  819. err = await UtilsModule.runJob(
  820. "GET_ERROR",
  821. {
  822. error: err
  823. },
  824. this
  825. );
  826. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  827. reject(new Error(err));
  828. } else {
  829. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  830. station.currentSong.skipVotes = 0;
  831. }
  832. // TODO Pub/Sub this
  833. WSModule.runJob("EMIT_TO_ROOM", {
  834. room: `station.${station._id}`,
  835. args: [
  836. "event:songs.next",
  837. {
  838. currentSong: station.currentSong,
  839. startedAt: station.startedAt,
  840. paused: station.paused,
  841. timePaused: 0
  842. }
  843. ]
  844. })
  845. .then()
  846. .catch();
  847. if (station.privacy === "public") {
  848. WSModule.runJob("EMIT_TO_ROOM", {
  849. room: "home",
  850. args: ["event:station.nextSong", station._id, station.currentSong]
  851. })
  852. .then()
  853. .catch();
  854. } else {
  855. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: "home" }, this);
  856. sockets.forEach(async socketId => {
  857. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
  858. const { session } = socket;
  859. if (session.sessionId) {
  860. CacheModule.runJob(
  861. "HGET",
  862. { table: "sessions", key: session.sessionId },
  863. this
  864. // eslint-disable-next-line no-loop-func
  865. ).then(session => {
  866. if (session) {
  867. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  868. userModel => {
  869. userModel.findOne(
  870. {
  871. _id: session.userId
  872. },
  873. (err, user) => {
  874. if (!err && user) {
  875. if (user.role === "admin")
  876. socket.dispatch(
  877. "event:station.nextSong",
  878. station._id,
  879. station.currentSong
  880. );
  881. else if (
  882. station.type === "community" &&
  883. station.owner === session.userId
  884. )
  885. socket.dispatch(
  886. "event:station.nextSong",
  887. station._id,
  888. station.currentSong
  889. );
  890. }
  891. }
  892. );
  893. }
  894. );
  895. }
  896. });
  897. }
  898. });
  899. }
  900. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  901. WSModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  902. sockets: await WSModule.runJob(
  903. "GET_SOCKETS_FOR_ROOM",
  904. { room: `station.${station._id}` },
  905. this
  906. ),
  907. room: `song.${station.currentSong.songId}`
  908. });
  909. if (!station.paused) {
  910. NotificationsModule.runJob("SCHEDULE", {
  911. name: `stations.nextSong?id=${station._id}`,
  912. time: station.currentSong.duration * 1000,
  913. station
  914. });
  915. }
  916. } else {
  917. WSModule.runJob(
  918. "SOCKETS_LEAVE_SONG_ROOMS",
  919. {
  920. sockets: await WSModule.runJob(
  921. "GET_SOCKETS_FOR_ROOM",
  922. { room: `station.${station._id}` },
  923. this
  924. )
  925. },
  926. this
  927. ).then(() => {});
  928. }
  929. resolve({ station });
  930. }
  931. }
  932. );
  933. });
  934. }
  935. /**
  936. * Checks if a user can view/access a station
  937. *
  938. * @param {object} payload - object that contains the payload
  939. * @param {object} payload.station - the station object of the station in question
  940. * @param {string} payload.userId - the id of the user in question
  941. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  942. * @returns {Promise} - returns a promise (resolve, reject)
  943. */
  944. CAN_USER_VIEW_STATION(payload) {
  945. return new Promise((resolve, reject) => {
  946. async.waterfall(
  947. [
  948. next => {
  949. if (payload.station.privacy === "public") return next(true);
  950. if (payload.station.privacy === "unlisted")
  951. if (payload.hideUnlisted === true) return next();
  952. else return next(true);
  953. if (!payload.userId) return next("Not allowed");
  954. return next();
  955. },
  956. next => {
  957. DBModule.runJob(
  958. "GET_MODEL",
  959. {
  960. modelName: "user"
  961. },
  962. this
  963. ).then(userModel => {
  964. userModel.findOne({ _id: payload.userId }, next);
  965. });
  966. },
  967. (user, next) => {
  968. if (!user) return next("Not allowed");
  969. if (user.role === "admin") return next(true);
  970. if (payload.station.type === "official") return next("Not allowed");
  971. if (payload.station.owner === payload.userId) return next(true);
  972. return next("Not allowed");
  973. }
  974. ],
  975. async errOrResult => {
  976. if (errOrResult !== true && errOrResult !== "Not allowed") {
  977. errOrResult = await UtilsModule.runJob(
  978. "GET_ERROR",
  979. {
  980. error: errOrResult
  981. },
  982. this
  983. );
  984. reject(new Error(errOrResult));
  985. } else {
  986. resolve(errOrResult === true);
  987. }
  988. }
  989. );
  990. });
  991. }
  992. /**
  993. * Checks if a user has favorited a station or not
  994. *
  995. * @param {object} payload - object that contains the payload
  996. * @param {object} payload.stationId - the id of the station in question
  997. * @param {string} payload.userId - the id of the user in question
  998. * @returns {Promise} - returns a promise (resolve, reject)
  999. */
  1000. HAS_USER_FAVORITED_STATION(payload) {
  1001. return new Promise((resolve, reject) => {
  1002. async.waterfall(
  1003. [
  1004. next => {
  1005. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(userModel => {
  1006. userModel.findOne({ _id: payload.userId }, next);
  1007. });
  1008. },
  1009. (user, next) => {
  1010. if (!user) return next("User not found.");
  1011. if (user.favoriteStations.indexOf(payload.stationId) !== -1) return next(null, true);
  1012. return next(null, false);
  1013. }
  1014. ],
  1015. async (err, isStationFavorited) => {
  1016. if (err && err !== true) {
  1017. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1018. return reject(new Error(err));
  1019. }
  1020. return resolve(isStationFavorited);
  1021. }
  1022. );
  1023. });
  1024. }
  1025. /**
  1026. * Returns a list of sockets in a room that can and can't know about a station
  1027. *
  1028. * @param {object} payload - the payload object
  1029. * @param {object} payload.station - the station object
  1030. * @param {string} payload.room - the websockets room to get the sockets from
  1031. * @returns {Promise} - returns a promise (resolve, reject)
  1032. */
  1033. GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION(payload) {
  1034. return new Promise((resolve, reject) => {
  1035. WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: payload.room }, this)
  1036. .then(socketsObject => {
  1037. const sockets = Object.keys(socketsObject).map(socketKey => socketsObject[socketKey]);
  1038. let socketsThatCan = [];
  1039. const socketsThatCannot = [];
  1040. if (payload.station.privacy === "public") {
  1041. socketsThatCan = sockets;
  1042. resolve({ socketsThatCan, socketsThatCannot });
  1043. } else {
  1044. async.eachLimit(
  1045. sockets,
  1046. 1,
  1047. (socket, next) => {
  1048. const { session } = socket;
  1049. async.waterfall(
  1050. [
  1051. next => {
  1052. if (!session.sessionId) next("No session id");
  1053. else next();
  1054. },
  1055. next => {
  1056. CacheModule.runJob(
  1057. "HGET",
  1058. {
  1059. table: "sessions",
  1060. key: session.sessionId
  1061. },
  1062. this
  1063. )
  1064. .then(response => {
  1065. next(null, response);
  1066. })
  1067. .catch(next);
  1068. },
  1069. (session, next) => {
  1070. if (!session) next("No session");
  1071. else {
  1072. DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
  1073. .then(userModel => {
  1074. next(null, userModel);
  1075. })
  1076. .catch(next);
  1077. }
  1078. },
  1079. (userModel, next) => {
  1080. if (!userModel) next("No user model");
  1081. else
  1082. userModel.findOne(
  1083. {
  1084. _id: session.userId
  1085. },
  1086. next
  1087. );
  1088. },
  1089. (user, next) => {
  1090. if (!user) next("No user found");
  1091. else if (user.role === "admin") {
  1092. socketsThatCan.push(socket);
  1093. next();
  1094. } else if (
  1095. payload.station.type === "community" &&
  1096. payload.station.owner === session.userId
  1097. ) {
  1098. socketsThatCan.push(socket);
  1099. next();
  1100. }
  1101. }
  1102. ],
  1103. err => {
  1104. if (err) socketsThatCannot.push(socket);
  1105. next();
  1106. }
  1107. );
  1108. },
  1109. err => {
  1110. if (err) reject(err);
  1111. else resolve({ socketsThatCan, socketsThatCannot });
  1112. }
  1113. );
  1114. }
  1115. })
  1116. .catch(reject);
  1117. });
  1118. }
  1119. }
  1120. export default new _StationsModule();