stations.js 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let SongsModule;
  8. let NotificationsModule;
  9. class _StationsModule extends CoreClass {
  10. // eslint-disable-next-line require-jsdoc
  11. constructor() {
  12. super("stations");
  13. StationsModule = this;
  14. }
  15. /**
  16. * Initialises the stations module
  17. *
  18. * @returns {Promise} - returns promise (reject, resolve)
  19. */
  20. async initialize() {
  21. CacheModule = this.moduleManager.modules.cache;
  22. DBModule = this.moduleManager.modules.db;
  23. UtilsModule = this.moduleManager.modules.utils;
  24. SongsModule = this.moduleManager.modules.songs;
  25. NotificationsModule = this.moduleManager.modules.notifications;
  26. this.defaultSong = {
  27. songId: "60ItHLz5WEA",
  28. title: "Faded - Alan Walker",
  29. duration: 212,
  30. skipDuration: 0,
  31. likes: -1,
  32. dislikes: -1
  33. };
  34. // TEMP
  35. CacheModule.runJob("SUB", {
  36. channel: "station.pause",
  37. cb: async stationId => {
  38. NotificationsModule.runJob("REMOVE", {
  39. subscription: `stations.nextSong?id=${stationId}`
  40. }).then();
  41. }
  42. });
  43. CacheModule.runJob("SUB", {
  44. channel: "station.resume",
  45. cb: async stationId => {
  46. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  47. }
  48. });
  49. CacheModule.runJob("SUB", {
  50. channel: "station.queueUpdate",
  51. cb: async stationId => {
  52. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  53. if (!station.currentSong && station.queue.length > 0) {
  54. StationsModule.runJob("INITIALIZE_STATION", {
  55. stationId
  56. }).then();
  57. }
  58. });
  59. }
  60. });
  61. CacheModule.runJob("SUB", {
  62. channel: "station.newOfficialPlaylist",
  63. cb: async stationId => {
  64. CacheModule.runJob("HGET", {
  65. table: "officialPlaylists",
  66. key: stationId
  67. }).then(playlistObj => {
  68. if (playlistObj) {
  69. UtilsModule.runJob("EMIT_TO_ROOM", {
  70. room: `station.${stationId}`,
  71. args: ["event:newOfficialPlaylist", playlistObj.songs]
  72. });
  73. }
  74. });
  75. }
  76. });
  77. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  78. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  79. return new Promise((resolve, reject) =>
  80. async.waterfall(
  81. [
  82. next => {
  83. this.setStage(2);
  84. CacheModule.runJob("HGETALL", { table: "stations" })
  85. .then(stations => {
  86. next(null, stations);
  87. })
  88. .catch(next);
  89. },
  90. (stations, next) => {
  91. this.setStage(3);
  92. if (!stations) return next();
  93. const stationIds = Object.keys(stations);
  94. return async.each(
  95. stationIds,
  96. (stationId, next) => {
  97. stationModel.findOne({ _id: stationId }, (err, station) => {
  98. if (err) next(err);
  99. else if (!station) {
  100. CacheModule.runJob("HDEL", {
  101. table: "stations",
  102. key: stationId
  103. })
  104. .then(() => {
  105. next();
  106. })
  107. .catch(next);
  108. } else next();
  109. });
  110. },
  111. next
  112. );
  113. },
  114. next => {
  115. this.setStage(4);
  116. stationModel.find({}, next);
  117. },
  118. (stations, next) => {
  119. this.setStage(5);
  120. async.each(
  121. stations,
  122. (station, next2) => {
  123. async.waterfall(
  124. [
  125. next => {
  126. CacheModule.runJob("HSET", {
  127. table: "stations",
  128. key: station._id,
  129. value: stationSchema(station)
  130. })
  131. .then(station => next(null, station))
  132. .catch(next);
  133. },
  134. (station, next) => {
  135. StationsModule.runJob(
  136. "INITIALIZE_STATION",
  137. {
  138. stationId: station._id,
  139. bypassQueue: true
  140. },
  141. { bypassQueue: true }
  142. )
  143. .then(() => {
  144. next();
  145. })
  146. .catch(next); // bypassQueue is true because otherwise the module will never initialize
  147. }
  148. ],
  149. err => {
  150. next2(err);
  151. }
  152. );
  153. },
  154. next
  155. );
  156. }
  157. ],
  158. async err => {
  159. if (err) {
  160. err = await UtilsModule.runJob("GET_ERROR", {
  161. error: err
  162. });
  163. reject(new Error(err));
  164. } else {
  165. resolve();
  166. }
  167. }
  168. )
  169. );
  170. }
  171. /**
  172. * Initialises a station
  173. *
  174. * @param {object} payload - object that contains the payload
  175. * @param {string} payload.stationId - id of the station to initialise
  176. * @param {boolean} payload.bypassQueue - UNKNOWN
  177. * @returns {Promise} - returns a promise (resolve, reject)
  178. */
  179. INITIALIZE_STATION(payload) {
  180. return new Promise((resolve, reject) => {
  181. // if (typeof cb !== 'function') cb = ()=>{};
  182. async.waterfall(
  183. [
  184. next => {
  185. StationsModule.runJob(
  186. "GET_STATION",
  187. {
  188. stationId: payload.stationId,
  189. bypassQueue: payload.bypassQueue
  190. },
  191. { bypassQueue: payload.bypassQueue },
  192. this
  193. )
  194. .then(station => {
  195. next(null, station);
  196. })
  197. .catch(next);
  198. },
  199. (station, next) => {
  200. if (!station) return next("Station not found.");
  201. NotificationsModule.runJob(
  202. "UNSCHEDULE",
  203. {
  204. name: `stations.nextSong?id=${station._id}`
  205. },
  206. this
  207. )
  208. .then()
  209. .catch();
  210. NotificationsModule.runJob(
  211. "SUBSCRIBE",
  212. {
  213. name: `stations.nextSong?id=${station._id}`,
  214. cb: () =>
  215. StationsModule.runJob(
  216. "SKIP_STATION",
  217. {
  218. stationId: station._id
  219. },
  220. this
  221. ),
  222. unique: true,
  223. station
  224. },
  225. this
  226. )
  227. .then()
  228. .catch();
  229. if (station.paused) return next(true, station);
  230. return next(null, station);
  231. },
  232. (station, next) => {
  233. if (!station.currentSong) {
  234. return StationsModule.runJob(
  235. "SKIP_STATION",
  236. {
  237. stationId: station._id,
  238. bypassQueue: payload.bypassQueue
  239. },
  240. this
  241. )
  242. .then(station => {
  243. next(true, station);
  244. })
  245. .catch(next)
  246. .finally(() => {});
  247. }
  248. let timeLeft =
  249. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  250. if (Number.isNaN(timeLeft)) timeLeft = -1;
  251. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  252. return StationsModule.runJob(
  253. "SKIP_STATION",
  254. {
  255. stationId: station._id,
  256. bypassQueue: payload.bypassQueue
  257. },
  258. this
  259. )
  260. .then(station => {
  261. next(null, station);
  262. })
  263. .catch(next);
  264. }
  265. // name, time, cb, station
  266. NotificationsModule.runJob(
  267. "SCHEDULE",
  268. {
  269. name: `stations.nextSong?id=${station._id}`,
  270. time: timeLeft,
  271. station
  272. },
  273. this
  274. );
  275. return next(null, station);
  276. }
  277. ],
  278. async (err, station) => {
  279. if (err && err !== true) {
  280. err = await UtilsModule.runJob(
  281. "GET_ERROR",
  282. {
  283. error: err
  284. },
  285. this
  286. );
  287. reject(new Error(err));
  288. } else resolve(station);
  289. }
  290. );
  291. });
  292. }
  293. /**
  294. * Calculates the next song for the station
  295. *
  296. * @param {object} payload - object that contains the payload
  297. * @param {string} payload.station - station object to calculate song for
  298. * @param {boolean} payload.bypassQueue - UNKNOWN
  299. * @returns {Promise} - returns a promise (resolve, reject)
  300. */
  301. async CALCULATE_SONG_FOR_STATION(payload) {
  302. // station, bypassValidate = false
  303. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  304. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  305. return new Promise((resolve, reject) => {
  306. const songList = [];
  307. return async.waterfall(
  308. [
  309. next => {
  310. if (payload.station.genres.length === 0) return next();
  311. const genresDone = [];
  312. return payload.station.genres.forEach(genre => {
  313. songModel.find({ genres: genre }, (err, songs) => {
  314. if (!err) {
  315. songs.forEach(song => {
  316. if (songList.indexOf(song._id) === -1) {
  317. let found = false;
  318. song.genres.forEach(songGenre => {
  319. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  320. found = true;
  321. });
  322. if (!found) {
  323. songList.push(song._id);
  324. }
  325. }
  326. });
  327. }
  328. genresDone.push(genre);
  329. if (genresDone.length === payload.station.genres.length) next();
  330. });
  331. });
  332. },
  333. next => {
  334. const playlist = [];
  335. songList.forEach(songId => {
  336. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  337. });
  338. // eslint-disable-next-line array-callback-return
  339. payload.station.playlist.filter(songId => {
  340. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  341. });
  342. UtilsModule.runJob("SHUFFLE", { array: playlist })
  343. .then(result => {
  344. next(null, result.array);
  345. }, this)
  346. .catch(next);
  347. },
  348. (playlist, next) => {
  349. StationsModule.runJob(
  350. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  351. {
  352. stationId: payload.station._id,
  353. songList: playlist,
  354. bypassQueue: payload.bypassQueue
  355. },
  356. this
  357. )
  358. .then(() => {
  359. next(null, playlist);
  360. })
  361. .catch(next);
  362. },
  363. (playlist, next) => {
  364. stationModel.updateOne(
  365. { _id: payload.station._id },
  366. { $set: { playlist } },
  367. { runValidators: true },
  368. () => {
  369. StationsModule.runJob(
  370. "UPDATE_STATION",
  371. {
  372. stationId: payload.station._id,
  373. bypassQueue: payload.bypassQueue
  374. },
  375. this
  376. )
  377. .then(() => {
  378. next(null, playlist);
  379. })
  380. .catch(next);
  381. }
  382. );
  383. }
  384. ],
  385. (err, newPlaylist) => {
  386. if (err) return reject(new Error(err));
  387. return resolve(newPlaylist);
  388. }
  389. );
  390. });
  391. }
  392. /**
  393. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  394. *
  395. * @param {object} payload - object that contains the payload
  396. * @param {string} payload.stationId - id of the station
  397. * @returns {Promise} - returns a promise (resolve, reject)
  398. */
  399. GET_STATION(payload) {
  400. return new Promise((resolve, reject) => {
  401. async.waterfall(
  402. [
  403. next => {
  404. CacheModule.runJob(
  405. "HGET",
  406. {
  407. table: "stations",
  408. key: payload.stationId
  409. },
  410. this
  411. )
  412. .then(station => {
  413. next(null, station);
  414. })
  415. .catch(next);
  416. },
  417. (station, next) => {
  418. if (station) return next(true, station);
  419. return this.stationModel.findOne({ _id: payload.stationId }, next);
  420. },
  421. (station, next) => {
  422. if (station) {
  423. if (station.type === "official") {
  424. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  425. stationId: station._id,
  426. songList: station.playlist
  427. })
  428. .then()
  429. .catch();
  430. }
  431. station = this.stationSchema(station);
  432. CacheModule.runJob("HSET", {
  433. table: "stations",
  434. key: payload.stationId,
  435. value: station
  436. })
  437. .then()
  438. .catch();
  439. next(true, station);
  440. } else next("Station not found");
  441. }
  442. ],
  443. async (err, station) => {
  444. if (err && err !== true) {
  445. err = await UtilsModule.runJob(
  446. "GET_ERROR",
  447. {
  448. error: err
  449. },
  450. this
  451. );
  452. reject(new Error(err));
  453. } else resolve(station);
  454. }
  455. );
  456. });
  457. }
  458. /**
  459. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  460. *
  461. * @param {object} payload - object that contains the payload
  462. * @param {string} payload.stationName - the unique name of the station
  463. * @returns {Promise} - returns a promise (resolve, reject)
  464. */
  465. async GET_STATION_BY_NAME(payload) {
  466. const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
  467. return new Promise((resolve, reject) =>
  468. async.waterfall(
  469. [
  470. next => {
  471. stationModel.findOne({ name: payload.stationName }, next);
  472. },
  473. (station, next) => {
  474. if (station) {
  475. if (station.type === "official") {
  476. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  477. stationId: station._id,
  478. songList: station.playlist
  479. });
  480. }
  481. CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }, this).then(stationSchema => {
  482. station = stationSchema(station);
  483. CacheModule.runJob("HSET", {
  484. table: "stations",
  485. key: station._id,
  486. value: station
  487. });
  488. next(true, station);
  489. });
  490. } else next("Station not found");
  491. }
  492. ],
  493. (err, station) => {
  494. if (err && err !== true) return reject(new Error(err));
  495. return resolve(station);
  496. }
  497. )
  498. );
  499. }
  500. /**
  501. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  502. *
  503. * @param {object} payload - object that contains the payload
  504. * @param {string} payload.stationId - the id of the station to update
  505. * @returns {Promise} - returns a promise (resolve, reject)
  506. */
  507. UPDATE_STATION(payload) {
  508. return new Promise((resolve, reject) => {
  509. async.waterfall(
  510. [
  511. next => {
  512. this.stationModel.findOne({ _id: payload.stationId }, next);
  513. },
  514. (station, next) => {
  515. if (!station) {
  516. CacheModule.runJob("HDEL", {
  517. table: "stations",
  518. key: payload.stationId
  519. })
  520. .then()
  521. .catch();
  522. return next("Station not found");
  523. }
  524. return CacheModule.runJob(
  525. "HSET",
  526. {
  527. table: "stations",
  528. key: payload.stationId,
  529. value: station
  530. },
  531. this
  532. )
  533. .then(station => {
  534. next(null, station);
  535. })
  536. .catch(next);
  537. }
  538. ],
  539. async (err, station) => {
  540. if (err && err !== true) {
  541. err = await UtilsModule.runJob(
  542. "GET_ERROR",
  543. {
  544. error: err
  545. },
  546. this
  547. );
  548. reject(new Error(err));
  549. } else resolve(station);
  550. }
  551. );
  552. });
  553. }
  554. /**
  555. * Creates the official playlist for a station
  556. *
  557. * @param {object} payload - object that contains the payload
  558. * @param {string} payload.stationId - the id of the station
  559. * @param {Array} payload.songList - list of songs to put in official playlist
  560. * @returns {Promise} - returns a promise (resolve, reject)
  561. */
  562. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  563. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  564. console.log(typeof payload.songList, payload.songList);
  565. return new Promise(resolve => {
  566. const lessInfoPlaylist = [];
  567. return async.each(
  568. payload.songList,
  569. (song, next) => {
  570. SongsModule.runJob("GET_SONG", { id: song }, this)
  571. .then(response => {
  572. const { song } = response;
  573. if (song) {
  574. const newSong = {
  575. songId: song.songId,
  576. title: song.title,
  577. artists: song.artists,
  578. duration: song.duration
  579. };
  580. lessInfoPlaylist.push(newSong);
  581. }
  582. })
  583. .finally(() => {
  584. next();
  585. });
  586. },
  587. () => {
  588. CacheModule.runJob(
  589. "HSET",
  590. {
  591. table: "officialPlaylists",
  592. key: payload.stationId,
  593. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  594. },
  595. this
  596. ).finally(() => {
  597. CacheModule.runJob("PUB", {
  598. channel: "station.newOfficialPlaylist",
  599. value: payload.stationId
  600. });
  601. resolve();
  602. });
  603. }
  604. );
  605. });
  606. }
  607. /**
  608. * Skips a station
  609. *
  610. * @param {object} payload - object that contains the payload
  611. * @param {string} payload.stationId - the id of the station to skip
  612. * @param {boolean} payload.bypassQueue - UNKNOWN
  613. * @returns {Promise} - returns a promise (resolve, reject)
  614. */
  615. SKIP_STATION(payload) {
  616. return new Promise((resolve, reject) => {
  617. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  618. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  619. async.waterfall(
  620. [
  621. next => {
  622. StationsModule.runJob(
  623. "GET_STATION",
  624. {
  625. stationId: payload.stationId,
  626. bypassQueue: payload.bypassQueue
  627. },
  628. this
  629. )
  630. .then(station => {
  631. next(null, station);
  632. })
  633. .catch(() => {});
  634. },
  635. // eslint-disable-next-line consistent-return
  636. (station, next) => {
  637. if (!station) return next("Station not found.");
  638. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  639. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  640. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  641. // Community station with party mode enabled and songs in the queue
  642. if (station.paused) return next(null, null, -19, station);
  643. return this.stationModel.updateOne(
  644. { _id: payload.stationId },
  645. {
  646. $pull: {
  647. queue: {
  648. _id: station.queue[0]._id
  649. }
  650. }
  651. },
  652. err => {
  653. if (err) return next(err);
  654. return next(null, station.queue[0], -12, station);
  655. }
  656. );
  657. }
  658. if (station.type === "community" && !station.partyMode) {
  659. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  660. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  661. if (err) return next(err);
  662. if (!playlist) return next(null, null, -13, station);
  663. playlist = playlist.songs;
  664. if (playlist.length > 0) {
  665. let currentSongIndex;
  666. if (station.currentSongIndex < playlist.length - 1)
  667. currentSongIndex = station.currentSongIndex + 1;
  668. else currentSongIndex = 0;
  669. const callback = (err, song) => {
  670. if (err) return next(err);
  671. if (song) return next(null, song, currentSongIndex, station);
  672. const currentSong = {
  673. songId: playlist[currentSongIndex].songId,
  674. title: playlist[currentSongIndex].title,
  675. duration: playlist[currentSongIndex].duration,
  676. likes: -1,
  677. dislikes: -1
  678. };
  679. return next(null, currentSong, currentSongIndex, station);
  680. };
  681. if (playlist[currentSongIndex]._id)
  682. return SongsModule.runJob(
  683. "GET_SONG",
  684. {
  685. id: playlist[currentSongIndex]._id
  686. },
  687. this
  688. )
  689. .then(response => callback(null, response.song))
  690. .catch(callback);
  691. return SongsModule.runJob(
  692. "GET_SONG_FROM_ID",
  693. {
  694. songId: playlist[currentSongIndex].songId
  695. },
  696. this
  697. )
  698. .then(response => callback(null, response.song))
  699. .catch(callback);
  700. }
  701. return next(null, null, -14, station);
  702. })
  703. );
  704. }
  705. if (station.type === "official" && station.playlist.length === 0) {
  706. return StationsModule.runJob(
  707. "CALCULATE_SONG_FOR_STATION",
  708. { station, bypassQueue: payload.bypassQueue },
  709. this
  710. )
  711. .then(playlist => {
  712. if (playlist.length === 0) return next(null, this.defaultSong, 0, station);
  713. return SongsModule.runJob(
  714. "GET_SONG",
  715. {
  716. id: playlist[0]
  717. },
  718. this
  719. )
  720. .then(response => {
  721. next(null, response.song, 0, station);
  722. })
  723. .catch(() => next(null, this.defaultSong, 0, station));
  724. })
  725. .catch(next);
  726. }
  727. if (station.type === "official" && station.playlist.length > 0) {
  728. return async.doUntil(
  729. next => {
  730. if (station.currentSongIndex < station.playlist.length - 1) {
  731. SongsModule.runJob(
  732. "GET_SONG",
  733. {
  734. id: station.playlist[station.currentSongIndex + 1]
  735. },
  736. this
  737. )
  738. .then(response => next(null, response.song, station.currentSongIndex + 1))
  739. .catch(() => {
  740. station.currentSongIndex += 1;
  741. next(null, null, null);
  742. });
  743. } else {
  744. StationsModule.runJob(
  745. "CALCULATE_SONG_FOR_STATION",
  746. {
  747. station,
  748. bypassQueue: payload.bypassQueue
  749. },
  750. this
  751. )
  752. .then(newPlaylist => {
  753. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  754. .then(response => {
  755. station.playlist = newPlaylist;
  756. next(null, response.song, 0);
  757. })
  758. .catch(() => next(null, this.defaultSong, 0));
  759. })
  760. .catch(() => {
  761. next(null, this.defaultSong, 0);
  762. });
  763. }
  764. },
  765. (song, currentSongIndex, next) => {
  766. if (song) return next(null, true, currentSongIndex);
  767. return next(null, false);
  768. },
  769. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  770. );
  771. }
  772. },
  773. (song, currentSongIndex, station, next) => {
  774. const $set = {};
  775. if (song === null) $set.currentSong = null;
  776. else if (song.likes === -1 && song.dislikes === -1) {
  777. $set.currentSong = {
  778. songId: song.songId,
  779. title: song.title,
  780. duration: song.duration,
  781. skipDuration: 0,
  782. likes: -1,
  783. dislikes: -1
  784. };
  785. } else {
  786. $set.currentSong = {
  787. songId: song.songId,
  788. title: song.title,
  789. artists: song.artists,
  790. duration: song.duration,
  791. likes: song.likes,
  792. dislikes: song.dislikes,
  793. skipDuration: song.skipDuration,
  794. thumbnail: song.thumbnail
  795. };
  796. }
  797. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  798. $set.startedAt = Date.now();
  799. $set.timePaused = 0;
  800. if (station.paused) $set.pausedAt = Date.now();
  801. next(null, $set, station);
  802. },
  803. ($set, station, next) => {
  804. this.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  805. StationsModule.runJob(
  806. "UPDATE_STATION",
  807. {
  808. stationId: station._id,
  809. bypassQueue: payload.bypassQueue
  810. },
  811. this
  812. )
  813. .then(station => {
  814. if (station.type === "community" && station.partyMode === true)
  815. CacheModule.runJob("PUB", {
  816. channel: "station.queueUpdate",
  817. value: payload.stationId
  818. })
  819. .then()
  820. .catch();
  821. next(null, station);
  822. })
  823. .catch(next);
  824. });
  825. }
  826. ],
  827. async (err, station) => {
  828. if (err) {
  829. err = await UtilsModule.runJob(
  830. "GET_ERROR",
  831. {
  832. error: err
  833. },
  834. this
  835. );
  836. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  837. reject(new Error(err));
  838. } else {
  839. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  840. station.currentSong.skipVotes = 0;
  841. }
  842. // TODO Pub/Sub this
  843. UtilsModule.runJob("EMIT_TO_ROOM", {
  844. room: `station.${station._id}`,
  845. args: [
  846. "event:songs.next",
  847. {
  848. currentSong: station.currentSong,
  849. startedAt: station.startedAt,
  850. paused: station.paused,
  851. timePaused: 0
  852. }
  853. ]
  854. })
  855. .then()
  856. .catch();
  857. if (station.privacy === "public") {
  858. UtilsModule.runJob("EMIT_TO_ROOM", {
  859. room: "home",
  860. args: ["event:station.nextSong", station._id, station.currentSong]
  861. })
  862. .then()
  863. .catch();
  864. } else {
  865. const sockets = await UtilsModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  866. for (
  867. let socketId = 0, socketKeys = Object.keys(sockets);
  868. socketId < socketKeys.length;
  869. socketId += 1
  870. ) {
  871. const socket = sockets[socketId];
  872. const { session } = sockets[socketId];
  873. if (session.sessionId) {
  874. CacheModule.runJob(
  875. "HGET",
  876. {
  877. table: "sessions",
  878. key: session.sessionId
  879. },
  880. this
  881. ).then(session => {
  882. if (session) {
  883. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  884. userModel => {
  885. userModel.findOne(
  886. {
  887. _id: session.userId
  888. },
  889. (err, user) => {
  890. if (!err && user) {
  891. if (user.role === "admin")
  892. socket.emit(
  893. "event:station.nextSong",
  894. station._id,
  895. station.currentSong
  896. );
  897. else if (
  898. station.type === "community" &&
  899. station.owner === session.userId
  900. )
  901. socket.emit(
  902. "event:station.nextSong",
  903. station._id,
  904. station.currentSong
  905. );
  906. }
  907. }
  908. );
  909. }
  910. );
  911. }
  912. });
  913. }
  914. }
  915. }
  916. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  917. UtilsModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  918. sockets: await UtilsModule.runJob(
  919. "GET_ROOM_SOCKETS",
  920. {
  921. room: `station.${station._id}`
  922. },
  923. this
  924. ),
  925. room: `song.${station.currentSong.songId}`
  926. });
  927. if (!station.paused) {
  928. NotificationsModule.runJob("SCHEDULE", {
  929. name: `stations.nextSong?id=${station._id}`,
  930. time: station.currentSong.duration * 1000,
  931. station
  932. });
  933. }
  934. } else {
  935. UtilsModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  936. sockets: await UtilsModule.runJob(
  937. "GET_ROOM_SOCKETS",
  938. {
  939. room: `station.${station._id}`
  940. },
  941. this
  942. )
  943. })
  944. .then()
  945. .catch();
  946. }
  947. resolve({ station });
  948. }
  949. }
  950. );
  951. });
  952. }
  953. /**
  954. * Checks if a user can view/access a station
  955. *
  956. * @param {object} payload - object that contains the payload
  957. * @param {object} payload.station - the station object of the station in question
  958. * @param {string} payload.userId - the id of the user in question
  959. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  960. * @returns {Promise} - returns a promise (resolve, reject)
  961. */
  962. CAN_USER_VIEW_STATION(payload) {
  963. return new Promise((resolve, reject) => {
  964. async.waterfall(
  965. [
  966. next => {
  967. if (payload.station.privacy === "public") return next(true);
  968. if (payload.station.privacy === "unlisted")
  969. if (payload.hideUnlisted === true) return next();
  970. else return next(true);
  971. if (!payload.userId) return next("Not allowed");
  972. return next();
  973. },
  974. next => {
  975. DBModule.runJob(
  976. "GET_MODEL",
  977. {
  978. modelName: "user"
  979. },
  980. this
  981. ).then(userModel => {
  982. userModel.findOne({ _id: payload.userId }, next);
  983. });
  984. },
  985. (user, next) => {
  986. if (!user) return next("Not allowed");
  987. if (user.role === "admin") return next(true);
  988. if (payload.station.type === "official") return next("Not allowed");
  989. if (payload.station.owner === payload.userId) return next(true);
  990. return next("Not allowed");
  991. }
  992. ],
  993. async errOrResult => {
  994. if (errOrResult !== true && errOrResult !== "Not allowed") {
  995. errOrResult = await UtilsModule.runJob(
  996. "GET_ERROR",
  997. {
  998. error: errOrResult
  999. },
  1000. this
  1001. );
  1002. reject(new Error(errOrResult));
  1003. } else {
  1004. resolve(errOrResult === true);
  1005. }
  1006. }
  1007. );
  1008. });
  1009. }
  1010. }
  1011. export default new _StationsModule();