stations.js 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1
  35. };
  36. this.userList = {};
  37. this.usersPerStation = {};
  38. this.usersPerStationCount = {};
  39. // TEMP
  40. CacheModule.runJob("SUB", {
  41. channel: "station.pause",
  42. cb: async stationId => {
  43. NotificationsModule.runJob("REMOVE", {
  44. subscription: `stations.nextSong?id=${stationId}`
  45. }).then();
  46. }
  47. });
  48. CacheModule.runJob("SUB", {
  49. channel: "station.resume",
  50. cb: async stationId => {
  51. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  52. }
  53. });
  54. CacheModule.runJob("SUB", {
  55. channel: "station.queueUpdate",
  56. cb: async stationId => {
  57. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  58. if (!station.currentSong && station.queue.length > 0) {
  59. StationsModule.runJob("INITIALIZE_STATION", {
  60. stationId
  61. }).then();
  62. }
  63. });
  64. }
  65. });
  66. CacheModule.runJob("SUB", {
  67. channel: "station.newOfficialPlaylist",
  68. cb: async stationId => {
  69. CacheModule.runJob("HGET", {
  70. table: "officialPlaylists",
  71. key: stationId
  72. }).then(playlistObj => {
  73. if (playlistObj) {
  74. IOModule.runJob("EMIT_TO_ROOM", {
  75. room: `station.${stationId}`,
  76. args: ["event:newOfficialPlaylist", playlistObj.songs]
  77. });
  78. }
  79. });
  80. }
  81. });
  82. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  83. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  84. return new Promise((resolve, reject) =>
  85. async.waterfall(
  86. [
  87. next => {
  88. this.setStage(2);
  89. CacheModule.runJob("HGETALL", { table: "stations" })
  90. .then(stations => {
  91. next(null, stations);
  92. })
  93. .catch(next);
  94. },
  95. (stations, next) => {
  96. this.setStage(3);
  97. if (!stations) return next();
  98. const stationIds = Object.keys(stations);
  99. return async.each(
  100. stationIds,
  101. (stationId, next) => {
  102. stationModel.findOne({ _id: stationId }, (err, station) => {
  103. if (err) next(err);
  104. else if (!station) {
  105. CacheModule.runJob("HDEL", {
  106. table: "stations",
  107. key: stationId
  108. })
  109. .then(() => {
  110. next();
  111. })
  112. .catch(next);
  113. } else next();
  114. });
  115. },
  116. next
  117. );
  118. },
  119. next => {
  120. this.setStage(4);
  121. stationModel.find({}, next);
  122. },
  123. (stations, next) => {
  124. this.setStage(5);
  125. async.each(
  126. stations,
  127. (station, next2) => {
  128. async.waterfall(
  129. [
  130. next => {
  131. CacheModule.runJob("HSET", {
  132. table: "stations",
  133. key: station._id,
  134. value: stationSchema(station)
  135. })
  136. .then(station => next(null, station))
  137. .catch(next);
  138. },
  139. (station, next) => {
  140. StationsModule.runJob(
  141. "INITIALIZE_STATION",
  142. {
  143. stationId: station._id
  144. },
  145. null,
  146. -1
  147. )
  148. .then(() => {
  149. next();
  150. })
  151. .catch(next);
  152. }
  153. ],
  154. err => {
  155. next2(err);
  156. }
  157. );
  158. },
  159. next
  160. );
  161. }
  162. ],
  163. async err => {
  164. if (err) {
  165. err = await UtilsModule.runJob("GET_ERROR", {
  166. error: err
  167. });
  168. reject(new Error(err));
  169. } else {
  170. resolve();
  171. }
  172. }
  173. )
  174. );
  175. }
  176. /**
  177. * Initialises a station
  178. *
  179. * @param {object} payload - object that contains the payload
  180. * @param {string} payload.stationId - id of the station to initialise
  181. * @returns {Promise} - returns a promise (resolve, reject)
  182. */
  183. INITIALIZE_STATION(payload) {
  184. return new Promise((resolve, reject) => {
  185. // if (typeof cb !== 'function') cb = ()=>{};
  186. async.waterfall(
  187. [
  188. next => {
  189. StationsModule.runJob(
  190. "GET_STATION",
  191. {
  192. stationId: payload.stationId
  193. },
  194. this
  195. )
  196. .then(station => {
  197. next(null, station);
  198. })
  199. .catch(next);
  200. },
  201. (station, next) => {
  202. if (!station) return next("Station not found.");
  203. return NotificationsModule.runJob(
  204. "UNSCHEDULE",
  205. {
  206. name: `stations.nextSong?id=${station._id}`
  207. },
  208. this
  209. )
  210. .then()
  211. .catch()
  212. .finally(() => {
  213. NotificationsModule.runJob("SUBSCRIBE", {
  214. name: `stations.nextSong?id=${station._id}`,
  215. cb: () =>
  216. StationsModule.runJob("SKIP_STATION", {
  217. stationId: station._id
  218. }),
  219. unique: true,
  220. station
  221. })
  222. .then()
  223. .catch();
  224. if (station.paused) return next(true, station);
  225. return next(null, station);
  226. });
  227. },
  228. (station, next) => {
  229. if (!station.currentSong) {
  230. return StationsModule.runJob(
  231. "SKIP_STATION",
  232. {
  233. stationId: station._id
  234. },
  235. this
  236. )
  237. .then(station => {
  238. next(true, station);
  239. })
  240. .catch(next)
  241. .finally(() => {});
  242. }
  243. let timeLeft =
  244. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  245. if (Number.isNaN(timeLeft)) timeLeft = -1;
  246. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  247. return StationsModule.runJob(
  248. "SKIP_STATION",
  249. {
  250. stationId: station._id
  251. },
  252. this
  253. )
  254. .then(station => {
  255. next(null, station);
  256. })
  257. .catch(next);
  258. }
  259. // name, time, cb, station
  260. NotificationsModule.runJob("SCHEDULE", {
  261. name: `stations.nextSong?id=${station._id}`,
  262. time: timeLeft,
  263. station
  264. });
  265. return next(null, station);
  266. }
  267. ],
  268. async (err, station) => {
  269. if (err && err !== true) {
  270. err = await UtilsModule.runJob(
  271. "GET_ERROR",
  272. {
  273. error: err
  274. },
  275. this
  276. );
  277. reject(new Error(err));
  278. } else resolve(station);
  279. }
  280. );
  281. });
  282. }
  283. /**
  284. * Calculates the next song for the station
  285. *
  286. * @param {object} payload - object that contains the payload
  287. * @param {string} payload.station - station object to calculate song for
  288. * @returns {Promise} - returns a promise (resolve, reject)
  289. */
  290. async CALCULATE_SONG_FOR_STATION(payload) {
  291. // station, bypassValidate = false
  292. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  293. return new Promise((resolve, reject) => {
  294. const songList = [];
  295. return async.waterfall(
  296. [
  297. next => {
  298. if (payload.station.genres.length === 0) return next();
  299. const genresDone = [];
  300. const blacklistedGenres = payload.station.blacklistedGenres.map(blacklistedGenre =>
  301. blacklistedGenre.toLowerCase()
  302. );
  303. return payload.station.genres.forEach(genre => {
  304. songModel.find({ genres: { $regex: genre, $options: "i" } }, (err, songs) => {
  305. if (!err) {
  306. songs.forEach(song => {
  307. if (songList.indexOf(song._id) === -1) {
  308. let found = false;
  309. song.genres.forEach(songGenre => {
  310. if (blacklistedGenres.indexOf(songGenre.toLowerCase()) !== -1)
  311. found = true;
  312. });
  313. if (!found) {
  314. songList.push(song._id);
  315. }
  316. }
  317. });
  318. }
  319. genresDone.push(genre);
  320. if (genresDone.length === payload.station.genres.length) next();
  321. });
  322. });
  323. },
  324. next => {
  325. const playlist = [];
  326. songList.forEach(songId => {
  327. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  328. });
  329. // eslint-disable-next-line array-callback-return
  330. payload.station.playlist.filter(songId => {
  331. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  332. });
  333. UtilsModule.runJob("SHUFFLE", { array: playlist })
  334. .then(result => {
  335. next(null, result.array);
  336. }, this)
  337. .catch(next);
  338. },
  339. (playlist, next) => {
  340. StationsModule.runJob(
  341. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  342. {
  343. stationId: payload.station._id,
  344. songList: playlist
  345. },
  346. this
  347. )
  348. .then(() => {
  349. next(null, playlist);
  350. })
  351. .catch(next);
  352. },
  353. (playlist, next) => {
  354. StationsModule.stationModel.updateOne(
  355. { _id: payload.station._id },
  356. { $set: { playlist } },
  357. { runValidators: true },
  358. () => {
  359. StationsModule.runJob(
  360. "UPDATE_STATION",
  361. {
  362. stationId: payload.station._id
  363. },
  364. this
  365. )
  366. .then(() => {
  367. next(null, playlist);
  368. })
  369. .catch(next);
  370. }
  371. );
  372. }
  373. ],
  374. (err, newPlaylist) => {
  375. if (err) return reject(new Error(err));
  376. return resolve(newPlaylist);
  377. }
  378. );
  379. });
  380. }
  381. /**
  382. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  383. *
  384. * @param {object} payload - object that contains the payload
  385. * @param {string} payload.stationId - id of the station
  386. * @returns {Promise} - returns a promise (resolve, reject)
  387. */
  388. GET_STATION(payload) {
  389. return new Promise((resolve, reject) => {
  390. async.waterfall(
  391. [
  392. next => {
  393. CacheModule.runJob(
  394. "HGET",
  395. {
  396. table: "stations",
  397. key: payload.stationId
  398. },
  399. this
  400. )
  401. .then(station => {
  402. next(null, station);
  403. })
  404. .catch(next);
  405. },
  406. (station, next) => {
  407. if (station) return next(true, station);
  408. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  409. },
  410. (station, next) => {
  411. if (station) {
  412. if (station.type === "official") {
  413. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  414. stationId: station._id,
  415. songList: station.playlist
  416. })
  417. .then()
  418. .catch();
  419. }
  420. station = StationsModule.stationSchema(station);
  421. CacheModule.runJob("HSET", {
  422. table: "stations",
  423. key: payload.stationId,
  424. value: station
  425. })
  426. .then()
  427. .catch();
  428. next(true, station);
  429. } else next("Station not found");
  430. }
  431. ],
  432. async (err, station) => {
  433. if (err && err !== true) {
  434. err = await UtilsModule.runJob(
  435. "GET_ERROR",
  436. {
  437. error: err
  438. },
  439. this
  440. );
  441. reject(new Error(err));
  442. } else resolve(station);
  443. }
  444. );
  445. });
  446. }
  447. /**
  448. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  449. *
  450. * @param {object} payload - object that contains the payload
  451. * @param {string} payload.stationName - the unique name of the station
  452. * @returns {Promise} - returns a promise (resolve, reject)
  453. */
  454. async GET_STATION_BY_NAME(payload) {
  455. return new Promise((resolve, reject) =>
  456. async.waterfall(
  457. [
  458. next => {
  459. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  460. },
  461. (station, next) => {
  462. if (station) {
  463. if (station.type === "official") {
  464. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  465. stationId: station._id,
  466. songList: station.playlist
  467. });
  468. }
  469. station = StationsModule.stationSchema(station);
  470. CacheModule.runJob("HSET", {
  471. table: "stations",
  472. key: station._id,
  473. value: station
  474. });
  475. next(true, station);
  476. } else next("Station not found");
  477. }
  478. ],
  479. (err, station) => {
  480. if (err && err !== true) return reject(new Error(err));
  481. return resolve(station);
  482. }
  483. )
  484. );
  485. }
  486. /**
  487. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  488. *
  489. * @param {object} payload - object that contains the payload
  490. * @param {string} payload.stationId - the id of the station to update
  491. * @returns {Promise} - returns a promise (resolve, reject)
  492. */
  493. UPDATE_STATION(payload) {
  494. return new Promise((resolve, reject) => {
  495. async.waterfall(
  496. [
  497. next => {
  498. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  499. },
  500. (station, next) => {
  501. if (!station) {
  502. CacheModule.runJob("HDEL", {
  503. table: "stations",
  504. key: payload.stationId
  505. })
  506. .then()
  507. .catch();
  508. return next("Station not found");
  509. }
  510. return CacheModule.runJob(
  511. "HSET",
  512. {
  513. table: "stations",
  514. key: payload.stationId,
  515. value: station
  516. },
  517. this
  518. )
  519. .then(station => {
  520. next(null, station);
  521. })
  522. .catch(next);
  523. }
  524. ],
  525. async (err, station) => {
  526. if (err && err !== true) {
  527. err = await UtilsModule.runJob(
  528. "GET_ERROR",
  529. {
  530. error: err
  531. },
  532. this
  533. );
  534. reject(new Error(err));
  535. } else resolve(station);
  536. }
  537. );
  538. });
  539. }
  540. /**
  541. * Creates the official playlist for a station
  542. *
  543. * @param {object} payload - object that contains the payload
  544. * @param {string} payload.stationId - the id of the station
  545. * @param {Array} payload.songList - list of songs to put in official playlist
  546. * @returns {Promise} - returns a promise (resolve, reject)
  547. */
  548. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  549. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  550. console.log(typeof payload.songList, payload.songList);
  551. return new Promise(resolve => {
  552. const lessInfoPlaylist = [];
  553. return async.each(
  554. payload.songList,
  555. (song, next) => {
  556. SongsModule.runJob("GET_SONG", { id: song }, this)
  557. .then(response => {
  558. const { song } = response;
  559. if (song) {
  560. const newSong = {
  561. songId: song.songId,
  562. title: song.title,
  563. artists: song.artists,
  564. duration: song.duration,
  565. thumbnail: song.thumbnail
  566. };
  567. lessInfoPlaylist.push(newSong);
  568. }
  569. })
  570. .finally(() => {
  571. next();
  572. });
  573. },
  574. () => {
  575. CacheModule.runJob(
  576. "HSET",
  577. {
  578. table: "officialPlaylists",
  579. key: payload.stationId,
  580. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  581. },
  582. this
  583. ).finally(() => {
  584. CacheModule.runJob("PUB", {
  585. channel: "station.newOfficialPlaylist",
  586. value: payload.stationId
  587. });
  588. resolve();
  589. });
  590. }
  591. );
  592. });
  593. }
  594. /**
  595. * Skips a station
  596. *
  597. * @param {object} payload - object that contains the payload
  598. * @param {string} payload.stationId - the id of the station to skip
  599. * @returns {Promise} - returns a promise (resolve, reject)
  600. */
  601. SKIP_STATION(payload) {
  602. return new Promise((resolve, reject) => {
  603. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  604. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  605. async.waterfall(
  606. [
  607. next => {
  608. NotificationsModule.runJob("UNSCHEDULE", {
  609. name: `stations.nextSong?id=${payload.stationId}`
  610. })
  611. .then(() => {
  612. next();
  613. })
  614. .catch(next);
  615. },
  616. next => {
  617. StationsModule.runJob(
  618. "GET_STATION",
  619. {
  620. stationId: payload.stationId
  621. },
  622. this
  623. )
  624. .then(station => {
  625. next(null, station);
  626. })
  627. .catch(() => {});
  628. },
  629. // eslint-disable-next-line consistent-return
  630. (station, next) => {
  631. if (!station) return next("Station not found.");
  632. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  633. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  634. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  635. // Community station with party mode enabled and songs in the queue
  636. if (station.paused) return next(null, null, -19, station);
  637. return StationsModule.stationModel.updateOne(
  638. { _id: payload.stationId },
  639. {
  640. $pull: {
  641. queue: {
  642. _id: station.queue[0]._id
  643. }
  644. }
  645. },
  646. err => {
  647. if (err) return next(err);
  648. return next(null, station.queue[0], -12, station);
  649. }
  650. );
  651. }
  652. if (station.type === "community" && !station.partyMode) {
  653. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  654. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  655. if (err) return next(err);
  656. if (!playlist) return next(null, null, -13, station);
  657. playlist = playlist.songs;
  658. if (playlist.length > 0) {
  659. let currentSongIndex;
  660. if (station.currentSongIndex < playlist.length - 1)
  661. currentSongIndex = station.currentSongIndex + 1;
  662. else currentSongIndex = 0;
  663. const callback = (err, song) => {
  664. if (err) return next(err);
  665. if (song) return next(null, song, currentSongIndex, station);
  666. const currentSong = {
  667. songId: playlist[currentSongIndex].songId,
  668. title: playlist[currentSongIndex].title,
  669. duration: playlist[currentSongIndex].duration,
  670. likes: -1,
  671. dislikes: -1
  672. };
  673. return next(null, currentSong, currentSongIndex, station);
  674. };
  675. if (playlist[currentSongIndex]._id)
  676. return SongsModule.runJob(
  677. "GET_SONG",
  678. {
  679. id: playlist[currentSongIndex]._id
  680. },
  681. this
  682. )
  683. .then(response => callback(null, response.song))
  684. .catch(callback);
  685. return SongsModule.runJob(
  686. "GET_SONG_FROM_ID",
  687. {
  688. songId: playlist[currentSongIndex].songId
  689. },
  690. this
  691. )
  692. .then(response => callback(null, response.song))
  693. .catch(callback);
  694. }
  695. return next(null, null, -14, station);
  696. })
  697. );
  698. }
  699. if (station.type === "official" && station.playlist.length === 0) {
  700. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  701. .then(playlist => {
  702. if (playlist.length === 0)
  703. return next(null, StationsModule.defaultSong, 0, station);
  704. return SongsModule.runJob(
  705. "GET_SONG",
  706. {
  707. id: playlist[0]
  708. },
  709. this
  710. )
  711. .then(response => {
  712. next(null, response.song, 0, station);
  713. })
  714. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  715. })
  716. .catch(err => {
  717. next(err);
  718. });
  719. }
  720. if (station.type === "official" && station.playlist.length > 0) {
  721. return async.doUntil(
  722. next => {
  723. if (station.currentSongIndex < station.playlist.length - 1) {
  724. SongsModule.runJob(
  725. "GET_SONG",
  726. {
  727. id: station.playlist[station.currentSongIndex + 1]
  728. },
  729. this
  730. )
  731. .then(response => next(null, response.song, station.currentSongIndex + 1))
  732. .catch(() => {
  733. station.currentSongIndex += 1;
  734. next(null, null, null);
  735. });
  736. } else {
  737. StationsModule.runJob(
  738. "CALCULATE_SONG_FOR_STATION",
  739. {
  740. station
  741. },
  742. this
  743. )
  744. .then(newPlaylist => {
  745. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  746. .then(response => {
  747. station.playlist = newPlaylist;
  748. next(null, response.song, 0);
  749. })
  750. .catch(() => next(null, StationsModule.defaultSong, 0));
  751. })
  752. .catch(() => {
  753. next(null, StationsModule.defaultSong, 0);
  754. });
  755. }
  756. },
  757. (song, currentSongIndex, next) => {
  758. if (song) return next(null, true, currentSongIndex);
  759. return next(null, false);
  760. },
  761. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  762. );
  763. }
  764. },
  765. (song, currentSongIndex, station, next) => {
  766. const $set = {};
  767. if (song === null) $set.currentSong = null;
  768. else if (song.likes === -1 && song.dislikes === -1) {
  769. $set.currentSong = {
  770. songId: song.songId,
  771. title: song.title,
  772. duration: song.duration,
  773. skipDuration: 0,
  774. likes: -1,
  775. dislikes: -1
  776. };
  777. } else {
  778. $set.currentSong = {
  779. songId: song.songId,
  780. title: song.title,
  781. artists: song.artists,
  782. duration: song.duration,
  783. likes: song.likes,
  784. dislikes: song.dislikes,
  785. skipDuration: song.skipDuration,
  786. thumbnail: song.thumbnail
  787. };
  788. }
  789. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  790. $set.startedAt = Date.now();
  791. $set.timePaused = 0;
  792. if (station.paused) $set.pausedAt = Date.now();
  793. next(null, $set, station);
  794. },
  795. ($set, station, next) => {
  796. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  797. StationsModule.runJob(
  798. "UPDATE_STATION",
  799. {
  800. stationId: station._id
  801. },
  802. this
  803. )
  804. .then(station => {
  805. if (station.type === "community" && station.partyMode === true)
  806. CacheModule.runJob("PUB", {
  807. channel: "station.queueUpdate",
  808. value: payload.stationId
  809. })
  810. .then()
  811. .catch();
  812. next(null, station);
  813. })
  814. .catch(next);
  815. });
  816. }
  817. ],
  818. async (err, station) => {
  819. if (err) {
  820. err = await UtilsModule.runJob(
  821. "GET_ERROR",
  822. {
  823. error: err
  824. },
  825. this
  826. );
  827. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  828. reject(new Error(err));
  829. } else {
  830. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  831. station.currentSong.skipVotes = 0;
  832. }
  833. // TODO Pub/Sub this
  834. IOModule.runJob("EMIT_TO_ROOM", {
  835. room: `station.${station._id}`,
  836. args: [
  837. "event:songs.next",
  838. {
  839. currentSong: station.currentSong,
  840. startedAt: station.startedAt,
  841. paused: station.paused,
  842. timePaused: 0
  843. }
  844. ]
  845. })
  846. .then()
  847. .catch();
  848. if (station.privacy === "public") {
  849. IOModule.runJob("EMIT_TO_ROOM", {
  850. room: "home",
  851. args: ["event:station.nextSong", station._id, station.currentSong]
  852. })
  853. .then()
  854. .catch();
  855. } else {
  856. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  857. Object.keys(sockets).forEach(socketKey => {
  858. const socket = sockets[socketKey];
  859. const { session } = socket;
  860. if (session.sessionId) {
  861. CacheModule.runJob(
  862. "HGET",
  863. {
  864. table: "sessions",
  865. key: session.sessionId
  866. },
  867. this
  868. // eslint-disable-next-line no-loop-func
  869. ).then(session => {
  870. if (session) {
  871. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  872. userModel => {
  873. userModel.findOne(
  874. {
  875. _id: session.userId
  876. },
  877. (err, user) => {
  878. if (!err && user) {
  879. if (user.role === "admin")
  880. socket.emit(
  881. "event:station.nextSong",
  882. station._id,
  883. station.currentSong
  884. );
  885. else if (
  886. station.type === "community" &&
  887. station.owner === session.userId
  888. )
  889. socket.emit(
  890. "event:station.nextSong",
  891. station._id,
  892. station.currentSong
  893. );
  894. }
  895. }
  896. );
  897. }
  898. );
  899. }
  900. });
  901. }
  902. });
  903. }
  904. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  905. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  906. sockets: await IOModule.runJob(
  907. "GET_ROOM_SOCKETS",
  908. {
  909. room: `station.${station._id}`
  910. },
  911. this
  912. ),
  913. room: `song.${station.currentSong.songId}`
  914. });
  915. if (!station.paused) {
  916. NotificationsModule.runJob("SCHEDULE", {
  917. name: `stations.nextSong?id=${station._id}`,
  918. time: station.currentSong.duration * 1000,
  919. station
  920. });
  921. }
  922. } else {
  923. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  924. sockets: await IOModule.runJob(
  925. "GET_ROOM_SOCKETS",
  926. {
  927. room: `station.${station._id}`
  928. },
  929. this
  930. )
  931. })
  932. .then()
  933. .catch();
  934. }
  935. resolve({ station });
  936. }
  937. }
  938. );
  939. });
  940. }
  941. /**
  942. * Checks if a user can view/access a station
  943. *
  944. * @param {object} payload - object that contains the payload
  945. * @param {object} payload.station - the station object of the station in question
  946. * @param {string} payload.userId - the id of the user in question
  947. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  948. * @returns {Promise} - returns a promise (resolve, reject)
  949. */
  950. CAN_USER_VIEW_STATION(payload) {
  951. return new Promise((resolve, reject) => {
  952. async.waterfall(
  953. [
  954. next => {
  955. if (payload.station.privacy === "public") return next(true);
  956. if (payload.station.privacy === "unlisted")
  957. if (payload.hideUnlisted === true) return next();
  958. else return next(true);
  959. if (!payload.userId) return next("Not allowed");
  960. return next();
  961. },
  962. next => {
  963. DBModule.runJob(
  964. "GET_MODEL",
  965. {
  966. modelName: "user"
  967. },
  968. this
  969. ).then(userModel => {
  970. userModel.findOne({ _id: payload.userId }, next);
  971. });
  972. },
  973. (user, next) => {
  974. if (!user) return next("Not allowed");
  975. if (user.role === "admin") return next(true);
  976. if (payload.station.type === "official") return next("Not allowed");
  977. if (payload.station.owner === payload.userId) return next(true);
  978. return next("Not allowed");
  979. }
  980. ],
  981. async errOrResult => {
  982. if (errOrResult !== true && errOrResult !== "Not allowed") {
  983. errOrResult = await UtilsModule.runJob(
  984. "GET_ERROR",
  985. {
  986. error: errOrResult
  987. },
  988. this
  989. );
  990. reject(new Error(errOrResult));
  991. } else {
  992. resolve(errOrResult === true);
  993. }
  994. }
  995. );
  996. });
  997. }
  998. }
  999. export default new _StationsModule();