stations.js 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1
  35. };
  36. this.userList = {};
  37. this.usersPerStation = {};
  38. this.usersPerStationCount = {};
  39. // TEMP
  40. CacheModule.runJob("SUB", {
  41. channel: "station.pause",
  42. cb: async stationId => {
  43. NotificationsModule.runJob("REMOVE", {
  44. subscription: `stations.nextSong?id=${stationId}`
  45. }).then();
  46. }
  47. });
  48. CacheModule.runJob("SUB", {
  49. channel: "station.resume",
  50. cb: async stationId => {
  51. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  52. }
  53. });
  54. CacheModule.runJob("SUB", {
  55. channel: "station.queueUpdate",
  56. cb: async stationId => {
  57. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  58. if (!station.currentSong && station.queue.length > 0) {
  59. StationsModule.runJob("INITIALIZE_STATION", {
  60. stationId
  61. }).then();
  62. }
  63. });
  64. }
  65. });
  66. CacheModule.runJob("SUB", {
  67. channel: "station.newOfficialPlaylist",
  68. cb: async stationId => {
  69. CacheModule.runJob("HGET", {
  70. table: "officialPlaylists",
  71. key: stationId
  72. }).then(playlistObj => {
  73. if (playlistObj) {
  74. IOModule.runJob("EMIT_TO_ROOM", {
  75. room: `station.${stationId}`,
  76. args: ["event:newOfficialPlaylist", playlistObj.songs]
  77. });
  78. }
  79. });
  80. }
  81. });
  82. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  83. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  84. return new Promise((resolve, reject) =>
  85. async.waterfall(
  86. [
  87. next => {
  88. this.setStage(2);
  89. CacheModule.runJob("HGETALL", { table: "stations" })
  90. .then(stations => {
  91. next(null, stations);
  92. })
  93. .catch(next);
  94. },
  95. (stations, next) => {
  96. this.setStage(3);
  97. if (!stations) return next();
  98. const stationIds = Object.keys(stations);
  99. return async.each(
  100. stationIds,
  101. (stationId, next) => {
  102. stationModel.findOne({ _id: stationId }, (err, station) => {
  103. if (err) next(err);
  104. else if (!station) {
  105. CacheModule.runJob("HDEL", {
  106. table: "stations",
  107. key: stationId
  108. })
  109. .then(() => {
  110. next();
  111. })
  112. .catch(next);
  113. } else next();
  114. });
  115. },
  116. next
  117. );
  118. },
  119. next => {
  120. this.setStage(4);
  121. stationModel.find({}, next);
  122. },
  123. (stations, next) => {
  124. this.setStage(5);
  125. async.each(
  126. stations,
  127. (station, next2) => {
  128. async.waterfall(
  129. [
  130. next => {
  131. CacheModule.runJob("HSET", {
  132. table: "stations",
  133. key: station._id,
  134. value: stationSchema(station)
  135. })
  136. .then(station => next(null, station))
  137. .catch(next);
  138. },
  139. (station, next) => {
  140. StationsModule.runJob(
  141. "INITIALIZE_STATION",
  142. {
  143. stationId: station._id
  144. },
  145. null,
  146. -1
  147. )
  148. .then(() => {
  149. next();
  150. })
  151. .catch(next);
  152. }
  153. ],
  154. err => {
  155. next2(err);
  156. }
  157. );
  158. },
  159. next
  160. );
  161. }
  162. ],
  163. async err => {
  164. if (err) {
  165. err = await UtilsModule.runJob("GET_ERROR", {
  166. error: err
  167. });
  168. reject(new Error(err));
  169. } else {
  170. resolve();
  171. }
  172. }
  173. )
  174. );
  175. }
  176. /**
  177. * Initialises a station
  178. *
  179. * @param {object} payload - object that contains the payload
  180. * @param {string} payload.stationId - id of the station to initialise
  181. * @returns {Promise} - returns a promise (resolve, reject)
  182. */
  183. INITIALIZE_STATION(payload) {
  184. return new Promise((resolve, reject) => {
  185. // if (typeof cb !== 'function') cb = ()=>{};
  186. async.waterfall(
  187. [
  188. next => {
  189. StationsModule.runJob(
  190. "GET_STATION",
  191. {
  192. stationId: payload.stationId
  193. },
  194. this
  195. )
  196. .then(station => {
  197. next(null, station);
  198. })
  199. .catch(next);
  200. },
  201. (station, next) => {
  202. if (!station) return next("Station not found.");
  203. return NotificationsModule.runJob(
  204. "UNSCHEDULE",
  205. {
  206. name: `stations.nextSong?id=${station._id}`
  207. },
  208. this
  209. )
  210. .then()
  211. .catch()
  212. .finally(() => {
  213. NotificationsModule.runJob(
  214. "SUBSCRIBE",
  215. {
  216. name: `stations.nextSong?id=${station._id}`,
  217. cb: () =>
  218. StationsModule.runJob("SKIP_STATION", {
  219. stationId: station._id
  220. }),
  221. unique: true,
  222. station
  223. },
  224. this
  225. )
  226. .then()
  227. .catch();
  228. if (station.paused) return next(true, station);
  229. return next(null, station);
  230. });
  231. },
  232. (station, next) => {
  233. if (!station.currentSong) {
  234. return StationsModule.runJob(
  235. "SKIP_STATION",
  236. {
  237. stationId: station._id
  238. },
  239. this
  240. )
  241. .then(station => {
  242. next(true, station);
  243. })
  244. .catch(next)
  245. .finally(() => {});
  246. }
  247. let timeLeft =
  248. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  249. if (Number.isNaN(timeLeft)) timeLeft = -1;
  250. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  251. return StationsModule.runJob(
  252. "SKIP_STATION",
  253. {
  254. stationId: station._id
  255. },
  256. this
  257. )
  258. .then(station => {
  259. next(null, station);
  260. })
  261. .catch(next);
  262. }
  263. // name, time, cb, station
  264. NotificationsModule.runJob("SCHEDULE", {
  265. name: `stations.nextSong?id=${station._id}`,
  266. time: timeLeft,
  267. station
  268. });
  269. return next(null, station);
  270. }
  271. ],
  272. async (err, station) => {
  273. if (err && err !== true) {
  274. err = await UtilsModule.runJob(
  275. "GET_ERROR",
  276. {
  277. error: err
  278. },
  279. this
  280. );
  281. reject(new Error(err));
  282. } else resolve(station);
  283. }
  284. );
  285. });
  286. }
  287. /**
  288. * Calculates the next song for the station
  289. *
  290. * @param {object} payload - object that contains the payload
  291. * @param {string} payload.station - station object to calculate song for
  292. * @returns {Promise} - returns a promise (resolve, reject)
  293. */
  294. async CALCULATE_SONG_FOR_STATION(payload) {
  295. // station, bypassValidate = false
  296. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  297. return new Promise((resolve, reject) => {
  298. const songList = [];
  299. return async.waterfall(
  300. [
  301. next => {
  302. if (payload.station.genres.length === 0) return next();
  303. const genresDone = [];
  304. return payload.station.genres.forEach(genre => {
  305. songModel.find({ genres: genre }, (err, songs) => {
  306. if (!err) {
  307. songs.forEach(song => {
  308. if (songList.indexOf(song._id) === -1) {
  309. let found = false;
  310. song.genres.forEach(songGenre => {
  311. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  312. found = true;
  313. });
  314. if (!found) {
  315. songList.push(song._id);
  316. }
  317. }
  318. });
  319. }
  320. genresDone.push(genre);
  321. if (genresDone.length === payload.station.genres.length) next();
  322. });
  323. });
  324. },
  325. next => {
  326. const playlist = [];
  327. songList.forEach(songId => {
  328. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  329. });
  330. // eslint-disable-next-line array-callback-return
  331. payload.station.playlist.filter(songId => {
  332. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  333. });
  334. UtilsModule.runJob("SHUFFLE", { array: playlist })
  335. .then(result => {
  336. next(null, result.array);
  337. }, this)
  338. .catch(next);
  339. },
  340. (playlist, next) => {
  341. StationsModule.runJob(
  342. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  343. {
  344. stationId: payload.station._id,
  345. songList: playlist
  346. },
  347. this
  348. )
  349. .then(() => {
  350. next(null, playlist);
  351. })
  352. .catch(next);
  353. },
  354. (playlist, next) => {
  355. StationsModule.stationModel.updateOne(
  356. { _id: payload.station._id },
  357. { $set: { playlist } },
  358. { runValidators: true },
  359. () => {
  360. StationsModule.runJob(
  361. "UPDATE_STATION",
  362. {
  363. stationId: payload.station._id
  364. },
  365. this
  366. )
  367. .then(() => {
  368. next(null, playlist);
  369. })
  370. .catch(next);
  371. }
  372. );
  373. }
  374. ],
  375. (err, newPlaylist) => {
  376. if (err) return reject(new Error(err));
  377. return resolve(newPlaylist);
  378. }
  379. );
  380. });
  381. }
  382. /**
  383. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  384. *
  385. * @param {object} payload - object that contains the payload
  386. * @param {string} payload.stationId - id of the station
  387. * @returns {Promise} - returns a promise (resolve, reject)
  388. */
  389. GET_STATION(payload) {
  390. return new Promise((resolve, reject) => {
  391. async.waterfall(
  392. [
  393. next => {
  394. CacheModule.runJob(
  395. "HGET",
  396. {
  397. table: "stations",
  398. key: payload.stationId
  399. },
  400. this
  401. )
  402. .then(station => {
  403. next(null, station);
  404. })
  405. .catch(next);
  406. },
  407. (station, next) => {
  408. if (station) return next(true, station);
  409. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  410. },
  411. (station, next) => {
  412. if (station) {
  413. if (station.type === "official") {
  414. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  415. stationId: station._id,
  416. songList: station.playlist
  417. })
  418. .then()
  419. .catch();
  420. }
  421. station = StationsModule.stationSchema(station);
  422. CacheModule.runJob("HSET", {
  423. table: "stations",
  424. key: payload.stationId,
  425. value: station
  426. })
  427. .then()
  428. .catch();
  429. next(true, station);
  430. } else next("Station not found");
  431. }
  432. ],
  433. async (err, station) => {
  434. if (err && err !== true) {
  435. err = await UtilsModule.runJob(
  436. "GET_ERROR",
  437. {
  438. error: err
  439. },
  440. this
  441. );
  442. reject(new Error(err));
  443. } else resolve(station);
  444. }
  445. );
  446. });
  447. }
  448. /**
  449. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  450. *
  451. * @param {object} payload - object that contains the payload
  452. * @param {string} payload.stationName - the unique name of the station
  453. * @returns {Promise} - returns a promise (resolve, reject)
  454. */
  455. async GET_STATION_BY_NAME(payload) {
  456. return new Promise((resolve, reject) =>
  457. async.waterfall(
  458. [
  459. next => {
  460. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  461. },
  462. (station, next) => {
  463. if (station) {
  464. if (station.type === "official") {
  465. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  466. stationId: station._id,
  467. songList: station.playlist
  468. });
  469. }
  470. station = StationsModule.stationSchema(station);
  471. CacheModule.runJob("HSET", {
  472. table: "stations",
  473. key: station._id,
  474. value: station
  475. });
  476. next(true, station);
  477. } else next("Station not found");
  478. }
  479. ],
  480. (err, station) => {
  481. if (err && err !== true) return reject(new Error(err));
  482. return resolve(station);
  483. }
  484. )
  485. );
  486. }
  487. /**
  488. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  489. *
  490. * @param {object} payload - object that contains the payload
  491. * @param {string} payload.stationId - the id of the station to update
  492. * @returns {Promise} - returns a promise (resolve, reject)
  493. */
  494. UPDATE_STATION(payload) {
  495. return new Promise((resolve, reject) => {
  496. async.waterfall(
  497. [
  498. next => {
  499. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  500. },
  501. (station, next) => {
  502. if (!station) {
  503. CacheModule.runJob("HDEL", {
  504. table: "stations",
  505. key: payload.stationId
  506. })
  507. .then()
  508. .catch();
  509. return next("Station not found");
  510. }
  511. return CacheModule.runJob(
  512. "HSET",
  513. {
  514. table: "stations",
  515. key: payload.stationId,
  516. value: station
  517. },
  518. this
  519. )
  520. .then(station => {
  521. next(null, station);
  522. })
  523. .catch(next);
  524. }
  525. ],
  526. async (err, station) => {
  527. if (err && err !== true) {
  528. err = await UtilsModule.runJob(
  529. "GET_ERROR",
  530. {
  531. error: err
  532. },
  533. this
  534. );
  535. reject(new Error(err));
  536. } else resolve(station);
  537. }
  538. );
  539. });
  540. }
  541. /**
  542. * Creates the official playlist for a station
  543. *
  544. * @param {object} payload - object that contains the payload
  545. * @param {string} payload.stationId - the id of the station
  546. * @param {Array} payload.songList - list of songs to put in official playlist
  547. * @returns {Promise} - returns a promise (resolve, reject)
  548. */
  549. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  550. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  551. console.log(typeof payload.songList, payload.songList);
  552. return new Promise(resolve => {
  553. const lessInfoPlaylist = [];
  554. return async.each(
  555. payload.songList,
  556. (song, next) => {
  557. SongsModule.runJob("GET_SONG", { id: song }, this)
  558. .then(response => {
  559. const { song } = response;
  560. if (song) {
  561. const newSong = {
  562. songId: song.songId,
  563. title: song.title,
  564. artists: song.artists,
  565. duration: song.duration
  566. };
  567. lessInfoPlaylist.push(newSong);
  568. }
  569. })
  570. .finally(() => {
  571. next();
  572. });
  573. },
  574. () => {
  575. CacheModule.runJob(
  576. "HSET",
  577. {
  578. table: "officialPlaylists",
  579. key: payload.stationId,
  580. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  581. },
  582. this
  583. ).finally(() => {
  584. CacheModule.runJob("PUB", {
  585. channel: "station.newOfficialPlaylist",
  586. value: payload.stationId
  587. });
  588. resolve();
  589. });
  590. }
  591. );
  592. });
  593. }
  594. /**
  595. * Skips a station
  596. *
  597. * @param {object} payload - object that contains the payload
  598. * @param {string} payload.stationId - the id of the station to skip
  599. * @returns {Promise} - returns a promise (resolve, reject)
  600. */
  601. SKIP_STATION(payload) {
  602. return new Promise((resolve, reject) => {
  603. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  604. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  605. async.waterfall(
  606. [
  607. next => {
  608. NotificationsModule.runJob("UNSCHEDULE", {
  609. name: `stations.nextSong?id=${payload.stationId}`
  610. })
  611. .then(() => {
  612. next();
  613. })
  614. .catch(next);
  615. },
  616. next => {
  617. StationsModule.runJob(
  618. "GET_STATION",
  619. {
  620. stationId: payload.stationId
  621. },
  622. this
  623. )
  624. .then(station => {
  625. next(null, station);
  626. })
  627. .catch(() => {});
  628. },
  629. // eslint-disable-next-line consistent-return
  630. (station, next) => {
  631. if (!station) return next("Station not found.");
  632. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  633. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  634. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  635. // Community station with party mode enabled and songs in the queue
  636. if (station.paused) return next(null, null, -19, station);
  637. return StationsModule.stationModel.updateOne(
  638. { _id: payload.stationId },
  639. {
  640. $pull: {
  641. queue: {
  642. _id: station.queue[0]._id
  643. }
  644. }
  645. },
  646. err => {
  647. if (err) return next(err);
  648. return next(null, station.queue[0], -12, station);
  649. }
  650. );
  651. }
  652. if (station.type === "community" && !station.partyMode) {
  653. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  654. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  655. if (err) return next(err);
  656. if (!playlist) return next(null, null, -13, station);
  657. playlist = playlist.songs;
  658. if (playlist.length > 0) {
  659. let currentSongIndex;
  660. if (station.currentSongIndex < playlist.length - 1)
  661. currentSongIndex = station.currentSongIndex + 1;
  662. else currentSongIndex = 0;
  663. const callback = (err, song) => {
  664. if (err) return next(err);
  665. if (song) return next(null, song, currentSongIndex, station);
  666. const currentSong = {
  667. songId: playlist[currentSongIndex].songId,
  668. title: playlist[currentSongIndex].title,
  669. duration: playlist[currentSongIndex].duration,
  670. likes: -1,
  671. dislikes: -1
  672. };
  673. return next(null, currentSong, currentSongIndex, station);
  674. };
  675. if (playlist[currentSongIndex]._id)
  676. return SongsModule.runJob(
  677. "GET_SONG",
  678. {
  679. id: playlist[currentSongIndex]._id
  680. },
  681. this
  682. )
  683. .then(response => callback(null, response.song))
  684. .catch(callback);
  685. return SongsModule.runJob(
  686. "GET_SONG_FROM_ID",
  687. {
  688. songId: playlist[currentSongIndex].songId
  689. },
  690. this
  691. )
  692. .then(response => callback(null, response.song))
  693. .catch(callback);
  694. }
  695. return next(null, null, -14, station);
  696. })
  697. );
  698. }
  699. if (station.type === "official" && station.playlist.length === 0) {
  700. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  701. .then(playlist => {
  702. if (playlist.length === 0)
  703. return next(null, StationsModule.defaultSong, 0, station);
  704. return SongsModule.runJob(
  705. "GET_SONG",
  706. {
  707. id: playlist[0]
  708. },
  709. this
  710. )
  711. .then(response => {
  712. next(null, response.song, 0, station);
  713. })
  714. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  715. })
  716. .catch(err => {
  717. next(err);
  718. });
  719. }
  720. if (station.type === "official" && station.playlist.length > 0) {
  721. return async.doUntil(
  722. next => {
  723. if (station.currentSongIndex < station.playlist.length - 1) {
  724. SongsModule.runJob(
  725. "GET_SONG",
  726. {
  727. id: station.playlist[station.currentSongIndex + 1]
  728. },
  729. this
  730. )
  731. .then(response => next(null, response.song, station.currentSongIndex + 1))
  732. .catch(() => {
  733. station.currentSongIndex += 1;
  734. next(null, null, null);
  735. });
  736. } else {
  737. StationsModule.runJob(
  738. "CALCULATE_SONG_FOR_STATION",
  739. {
  740. station
  741. },
  742. this
  743. )
  744. .then(newPlaylist => {
  745. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  746. .then(response => {
  747. station.playlist = newPlaylist;
  748. next(null, response.song, 0);
  749. })
  750. .catch(() => next(null, StationsModule.defaultSong, 0));
  751. })
  752. .catch(() => {
  753. next(null, StationsModule.defaultSong, 0);
  754. });
  755. }
  756. },
  757. (song, currentSongIndex, next) => {
  758. if (song) return next(null, true, currentSongIndex);
  759. return next(null, false);
  760. },
  761. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  762. );
  763. }
  764. },
  765. (song, currentSongIndex, station, next) => {
  766. const $set = {};
  767. if (song === null) $set.currentSong = null;
  768. else if (song.likes === -1 && song.dislikes === -1) {
  769. $set.currentSong = {
  770. songId: song.songId,
  771. title: song.title,
  772. duration: song.duration,
  773. skipDuration: 0,
  774. likes: -1,
  775. dislikes: -1
  776. };
  777. } else {
  778. $set.currentSong = {
  779. songId: song.songId,
  780. title: song.title,
  781. artists: song.artists,
  782. duration: song.duration,
  783. likes: song.likes,
  784. dislikes: song.dislikes,
  785. skipDuration: song.skipDuration,
  786. thumbnail: song.thumbnail
  787. };
  788. }
  789. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  790. $set.startedAt = Date.now();
  791. $set.timePaused = 0;
  792. if (station.paused) $set.pausedAt = Date.now();
  793. next(null, $set, station);
  794. },
  795. ($set, station, next) => {
  796. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  797. StationsModule.runJob(
  798. "UPDATE_STATION",
  799. {
  800. stationId: station._id
  801. },
  802. this
  803. )
  804. .then(station => {
  805. if (station.type === "community" && station.partyMode === true)
  806. CacheModule.runJob("PUB", {
  807. channel: "station.queueUpdate",
  808. value: payload.stationId
  809. })
  810. .then()
  811. .catch();
  812. next(null, station);
  813. })
  814. .catch(next);
  815. });
  816. }
  817. ],
  818. async (err, station) => {
  819. if (err) {
  820. err = await UtilsModule.runJob(
  821. "GET_ERROR",
  822. {
  823. error: err
  824. },
  825. this
  826. );
  827. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  828. reject(new Error(err));
  829. } else {
  830. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  831. station.currentSong.skipVotes = 0;
  832. }
  833. // TODO Pub/Sub this
  834. IOModule.runJob("EMIT_TO_ROOM", {
  835. room: `station.${station._id}`,
  836. args: [
  837. "event:songs.next",
  838. {
  839. currentSong: station.currentSong,
  840. startedAt: station.startedAt,
  841. paused: station.paused,
  842. timePaused: 0
  843. }
  844. ]
  845. })
  846. .then()
  847. .catch();
  848. if (station.privacy === "public") {
  849. IOModule.runJob("EMIT_TO_ROOM", {
  850. room: "home",
  851. args: ["event:station.nextSong", station._id, station.currentSong]
  852. })
  853. .then()
  854. .catch();
  855. } else {
  856. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  857. Object.keys(sockets).forEach(socketKey => {
  858. const socket = sockets[socketKey];
  859. const { session } = socket;
  860. if (session.sessionId) {
  861. CacheModule.runJob(
  862. "HGET",
  863. {
  864. table: "sessions",
  865. key: session.sessionId
  866. },
  867. this
  868. // eslint-disable-next-line no-loop-func
  869. ).then(session => {
  870. if (session) {
  871. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  872. userModel => {
  873. userModel.findOne(
  874. {
  875. _id: session.userId
  876. },
  877. (err, user) => {
  878. if (!err && user) {
  879. if (user.role === "admin")
  880. socket.emit(
  881. "event:station.nextSong",
  882. station._id,
  883. station.currentSong
  884. );
  885. else if (
  886. station.type === "community" &&
  887. station.owner === session.userId
  888. )
  889. socket.emit(
  890. "event:station.nextSong",
  891. station._id,
  892. station.currentSong
  893. );
  894. }
  895. }
  896. );
  897. }
  898. );
  899. }
  900. });
  901. }
  902. });
  903. }
  904. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  905. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  906. sockets: await IOModule.runJob(
  907. "GET_ROOM_SOCKETS",
  908. {
  909. room: `station.${station._id}`
  910. },
  911. this
  912. ),
  913. room: `song.${station.currentSong.songId}`
  914. });
  915. if (!station.paused) {
  916. NotificationsModule.runJob("SCHEDULE", {
  917. name: `stations.nextSong?id=${station._id}`,
  918. time: station.currentSong.duration * 1000,
  919. station
  920. });
  921. }
  922. } else {
  923. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  924. sockets: await IOModule.runJob(
  925. "GET_ROOM_SOCKETS",
  926. {
  927. room: `station.${station._id}`
  928. },
  929. this
  930. )
  931. })
  932. .then()
  933. .catch();
  934. }
  935. resolve({ station });
  936. }
  937. }
  938. );
  939. });
  940. }
  941. /**
  942. * Checks if a user can view/access a station
  943. *
  944. * @param {object} payload - object that contains the payload
  945. * @param {object} payload.station - the station object of the station in question
  946. * @param {string} payload.userId - the id of the user in question
  947. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  948. * @returns {Promise} - returns a promise (resolve, reject)
  949. */
  950. CAN_USER_VIEW_STATION(payload) {
  951. return new Promise((resolve, reject) => {
  952. async.waterfall(
  953. [
  954. next => {
  955. if (payload.station.privacy === "public") return next(true);
  956. if (payload.station.privacy === "unlisted")
  957. if (payload.hideUnlisted === true) return next();
  958. else return next(true);
  959. if (!payload.userId) return next("Not allowed");
  960. return next();
  961. },
  962. next => {
  963. DBModule.runJob(
  964. "GET_MODEL",
  965. {
  966. modelName: "user"
  967. },
  968. this
  969. ).then(userModel => {
  970. userModel.findOne({ _id: payload.userId }, next);
  971. });
  972. },
  973. (user, next) => {
  974. if (!user) return next("Not allowed");
  975. if (user.role === "admin") return next(true);
  976. if (payload.station.type === "official") return next("Not allowed");
  977. if (payload.station.owner === payload.userId) return next(true);
  978. return next("Not allowed");
  979. }
  980. ],
  981. async errOrResult => {
  982. if (errOrResult !== true && errOrResult !== "Not allowed") {
  983. errOrResult = await UtilsModule.runJob(
  984. "GET_ERROR",
  985. {
  986. error: errOrResult
  987. },
  988. this
  989. );
  990. reject(new Error(errOrResult));
  991. } else {
  992. resolve(errOrResult === true);
  993. }
  994. }
  995. );
  996. });
  997. }
  998. }
  999. export default new _StationsModule();