stations.js 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let SongsModule;
  8. let NotificationsModule;
  9. class _StationsModule extends CoreClass {
  10. // eslint-disable-next-line require-jsdoc
  11. constructor() {
  12. super("stations");
  13. StationsModule = this;
  14. }
  15. /**
  16. * Initialises the stations module
  17. *
  18. * @returns {Promise} - returns promise (reject, resolve)
  19. */
  20. async initialize() {
  21. CacheModule = this.moduleManager.modules.cache;
  22. DBModule = this.moduleManager.modules.db;
  23. UtilsModule = this.moduleManager.modules.utils;
  24. SongsModule = this.moduleManager.modules.songs;
  25. NotificationsModule = this.moduleManager.modules.notifications;
  26. this.defaultSong = {
  27. songId: "60ItHLz5WEA",
  28. title: "Faded - Alan Walker",
  29. duration: 212,
  30. skipDuration: 0,
  31. likes: -1,
  32. dislikes: -1
  33. };
  34. // TEMP
  35. CacheModule.runJob("SUB", {
  36. channel: "station.pause",
  37. cb: async stationId => {
  38. NotificationsModule.runJob("REMOVE", {
  39. subscription: `stations.nextSong?id=${stationId}`
  40. }).then();
  41. }
  42. });
  43. CacheModule.runJob("SUB", {
  44. channel: "station.resume",
  45. cb: async stationId => {
  46. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  47. }
  48. });
  49. CacheModule.runJob("SUB", {
  50. channel: "station.queueUpdate",
  51. cb: async stationId => {
  52. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  53. if (!station.currentSong && station.queue.length > 0) {
  54. StationsModule.runJob("INITIALIZE_STATION", {
  55. stationId
  56. }).then();
  57. }
  58. });
  59. }
  60. });
  61. CacheModule.runJob("SUB", {
  62. channel: "station.newOfficialPlaylist",
  63. cb: async stationId => {
  64. CacheModule.runJob("HGET", {
  65. table: "officialPlaylists",
  66. key: stationId
  67. }).then(playlistObj => {
  68. if (playlistObj) {
  69. UtilsModule.runJob("EMIT_TO_ROOM", {
  70. room: `station.${stationId}`,
  71. args: ["event:newOfficialPlaylist", playlistObj.songs]
  72. });
  73. }
  74. });
  75. }
  76. });
  77. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  78. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  79. return new Promise((resolve, reject) =>
  80. async.waterfall(
  81. [
  82. next => {
  83. this.setStage(2);
  84. CacheModule.runJob("HGETALL", { table: "stations" })
  85. .then(stations => {
  86. next(null, stations);
  87. })
  88. .catch(next);
  89. },
  90. (stations, next) => {
  91. this.setStage(3);
  92. if (!stations) return next();
  93. const stationIds = Object.keys(stations);
  94. return async.each(
  95. stationIds,
  96. (stationId, next) => {
  97. stationModel.findOne({ _id: stationId }, (err, station) => {
  98. if (err) next(err);
  99. else if (!station) {
  100. CacheModule.runJob("HDEL", {
  101. table: "stations",
  102. key: stationId
  103. })
  104. .then(() => {
  105. next();
  106. })
  107. .catch(next);
  108. } else next();
  109. });
  110. },
  111. next
  112. );
  113. },
  114. next => {
  115. this.setStage(4);
  116. stationModel.find({}, next);
  117. },
  118. (stations, next) => {
  119. this.setStage(5);
  120. async.each(
  121. stations,
  122. (station, next2) => {
  123. async.waterfall(
  124. [
  125. next => {
  126. CacheModule.runJob("HSET", {
  127. table: "stations",
  128. key: station._id,
  129. value: stationSchema(station)
  130. })
  131. .then(station => next(null, station))
  132. .catch(next);
  133. },
  134. (station, next) => {
  135. StationsModule.runJob(
  136. "INITIALIZE_STATION",
  137. {
  138. stationId: station._id
  139. },
  140. null,
  141. -1
  142. )
  143. .then(() => {
  144. next();
  145. })
  146. .catch(next);
  147. }
  148. ],
  149. err => {
  150. next2(err);
  151. }
  152. );
  153. },
  154. next
  155. );
  156. }
  157. ],
  158. async err => {
  159. if (err) {
  160. err = await UtilsModule.runJob("GET_ERROR", {
  161. error: err
  162. });
  163. reject(new Error(err));
  164. } else {
  165. resolve();
  166. }
  167. }
  168. )
  169. );
  170. }
  171. /**
  172. * Initialises a station
  173. *
  174. * @param {object} payload - object that contains the payload
  175. * @param {string} payload.stationId - id of the station to initialise
  176. * @returns {Promise} - returns a promise (resolve, reject)
  177. */
  178. INITIALIZE_STATION(payload) {
  179. return new Promise((resolve, reject) => {
  180. // if (typeof cb !== 'function') cb = ()=>{};
  181. async.waterfall(
  182. [
  183. next => {
  184. StationsModule.runJob(
  185. "GET_STATION",
  186. {
  187. stationId: payload.stationId
  188. },
  189. this
  190. )
  191. .then(station => {
  192. next(null, station);
  193. })
  194. .catch(next);
  195. },
  196. (station, next) => {
  197. if (!station) return next("Station not found.");
  198. NotificationsModule.runJob(
  199. "UNSCHEDULE",
  200. {
  201. name: `stations.nextSong?id=${station._id}`
  202. },
  203. this
  204. )
  205. .then()
  206. .catch();
  207. NotificationsModule.runJob(
  208. "SUBSCRIBE",
  209. {
  210. name: `stations.nextSong?id=${station._id}`,
  211. cb: () =>
  212. StationsModule.runJob(
  213. "SKIP_STATION",
  214. {
  215. stationId: station._id
  216. },
  217. this
  218. ),
  219. unique: true,
  220. station
  221. },
  222. this
  223. )
  224. .then()
  225. .catch();
  226. if (station.paused) return next(true, station);
  227. return next(null, station);
  228. },
  229. (station, next) => {
  230. if (!station.currentSong) {
  231. return StationsModule.runJob(
  232. "SKIP_STATION",
  233. {
  234. stationId: station._id
  235. },
  236. this
  237. )
  238. .then(station => {
  239. next(true, station);
  240. })
  241. .catch(next)
  242. .finally(() => {});
  243. }
  244. let timeLeft =
  245. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  246. if (Number.isNaN(timeLeft)) timeLeft = -1;
  247. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  248. return StationsModule.runJob(
  249. "SKIP_STATION",
  250. {
  251. stationId: station._id
  252. },
  253. this
  254. )
  255. .then(station => {
  256. next(null, station);
  257. })
  258. .catch(next);
  259. }
  260. // name, time, cb, station
  261. NotificationsModule.runJob(
  262. "SCHEDULE",
  263. {
  264. name: `stations.nextSong?id=${station._id}`,
  265. time: timeLeft,
  266. station
  267. },
  268. this
  269. );
  270. return next(null, station);
  271. }
  272. ],
  273. async (err, station) => {
  274. if (err && err !== true) {
  275. err = await UtilsModule.runJob(
  276. "GET_ERROR",
  277. {
  278. error: err
  279. },
  280. this
  281. );
  282. reject(new Error(err));
  283. } else resolve(station);
  284. }
  285. );
  286. });
  287. }
  288. /**
  289. * Calculates the next song for the station
  290. *
  291. * @param {object} payload - object that contains the payload
  292. * @param {string} payload.station - station object to calculate song for
  293. * @returns {Promise} - returns a promise (resolve, reject)
  294. */
  295. async CALCULATE_SONG_FOR_STATION(payload) {
  296. // station, bypassValidate = false
  297. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  298. return new Promise((resolve, reject) => {
  299. const songList = [];
  300. return async.waterfall(
  301. [
  302. next => {
  303. if (payload.station.genres.length === 0) return next();
  304. const genresDone = [];
  305. return payload.station.genres.forEach(genre => {
  306. songModel.find({ genres: genre }, (err, songs) => {
  307. if (!err) {
  308. songs.forEach(song => {
  309. if (songList.indexOf(song._id) === -1) {
  310. let found = false;
  311. song.genres.forEach(songGenre => {
  312. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  313. found = true;
  314. });
  315. if (!found) {
  316. songList.push(song._id);
  317. }
  318. }
  319. });
  320. }
  321. genresDone.push(genre);
  322. if (genresDone.length === payload.station.genres.length) next();
  323. });
  324. });
  325. },
  326. next => {
  327. const playlist = [];
  328. songList.forEach(songId => {
  329. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  330. });
  331. // eslint-disable-next-line array-callback-return
  332. payload.station.playlist.filter(songId => {
  333. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  334. });
  335. UtilsModule.runJob("SHUFFLE", { array: playlist })
  336. .then(result => {
  337. next(null, result.array);
  338. }, this)
  339. .catch(next);
  340. },
  341. (playlist, next) => {
  342. StationsModule.runJob(
  343. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  344. {
  345. stationId: payload.station._id,
  346. songList: playlist
  347. },
  348. this
  349. )
  350. .then(() => {
  351. next(null, playlist);
  352. })
  353. .catch(next);
  354. },
  355. (playlist, next) => {
  356. StationsModule.stationModel.updateOne(
  357. { _id: payload.station._id },
  358. { $set: { playlist } },
  359. { runValidators: true },
  360. () => {
  361. StationsModule.runJob(
  362. "UPDATE_STATION",
  363. {
  364. stationId: payload.station._id
  365. },
  366. this
  367. )
  368. .then(() => {
  369. next(null, playlist);
  370. })
  371. .catch(next);
  372. }
  373. );
  374. }
  375. ],
  376. (err, newPlaylist) => {
  377. if (err) return reject(new Error(err));
  378. return resolve(newPlaylist);
  379. }
  380. );
  381. });
  382. }
  383. /**
  384. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  385. *
  386. * @param {object} payload - object that contains the payload
  387. * @param {string} payload.stationId - id of the station
  388. * @returns {Promise} - returns a promise (resolve, reject)
  389. */
  390. GET_STATION(payload) {
  391. return new Promise((resolve, reject) => {
  392. async.waterfall(
  393. [
  394. next => {
  395. CacheModule.runJob(
  396. "HGET",
  397. {
  398. table: "stations",
  399. key: payload.stationId
  400. },
  401. this
  402. )
  403. .then(station => {
  404. next(null, station);
  405. })
  406. .catch(next);
  407. },
  408. (station, next) => {
  409. if (station) return next(true, station);
  410. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  411. },
  412. (station, next) => {
  413. if (station) {
  414. if (station.type === "official") {
  415. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  416. stationId: station._id,
  417. songList: station.playlist
  418. })
  419. .then()
  420. .catch();
  421. }
  422. station = this.stationSchema(station);
  423. CacheModule.runJob("HSET", {
  424. table: "stations",
  425. key: payload.stationId,
  426. value: station
  427. })
  428. .then()
  429. .catch();
  430. next(true, station);
  431. } else next("Station not found");
  432. }
  433. ],
  434. async (err, station) => {
  435. if (err && err !== true) {
  436. err = await UtilsModule.runJob(
  437. "GET_ERROR",
  438. {
  439. error: err
  440. },
  441. this
  442. );
  443. reject(new Error(err));
  444. } else resolve(station);
  445. }
  446. );
  447. });
  448. }
  449. /**
  450. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  451. *
  452. * @param {object} payload - object that contains the payload
  453. * @param {string} payload.stationName - the unique name of the station
  454. * @returns {Promise} - returns a promise (resolve, reject)
  455. */
  456. async GET_STATION_BY_NAME(payload) {
  457. return new Promise((resolve, reject) =>
  458. async.waterfall(
  459. [
  460. next => {
  461. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  462. },
  463. (station, next) => {
  464. if (station) {
  465. if (station.type === "official") {
  466. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  467. stationId: station._id,
  468. songList: station.playlist
  469. });
  470. }
  471. CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }, this).then(stationSchema => {
  472. station = stationSchema(station);
  473. CacheModule.runJob("HSET", {
  474. table: "stations",
  475. key: station._id,
  476. value: station
  477. });
  478. next(true, station);
  479. });
  480. } else next("Station not found");
  481. }
  482. ],
  483. (err, station) => {
  484. if (err && err !== true) return reject(new Error(err));
  485. return resolve(station);
  486. }
  487. )
  488. );
  489. }
  490. /**
  491. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  492. *
  493. * @param {object} payload - object that contains the payload
  494. * @param {string} payload.stationId - the id of the station to update
  495. * @returns {Promise} - returns a promise (resolve, reject)
  496. */
  497. UPDATE_STATION(payload) {
  498. return new Promise((resolve, reject) => {
  499. async.waterfall(
  500. [
  501. next => {
  502. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  503. },
  504. (station, next) => {
  505. if (!station) {
  506. CacheModule.runJob("HDEL", {
  507. table: "stations",
  508. key: payload.stationId
  509. })
  510. .then()
  511. .catch();
  512. return next("Station not found");
  513. }
  514. return CacheModule.runJob(
  515. "HSET",
  516. {
  517. table: "stations",
  518. key: payload.stationId,
  519. value: station
  520. },
  521. this
  522. )
  523. .then(station => {
  524. next(null, station);
  525. })
  526. .catch(next);
  527. }
  528. ],
  529. async (err, station) => {
  530. if (err && err !== true) {
  531. err = await UtilsModule.runJob(
  532. "GET_ERROR",
  533. {
  534. error: err
  535. },
  536. this
  537. );
  538. reject(new Error(err));
  539. } else resolve(station);
  540. }
  541. );
  542. });
  543. }
  544. /**
  545. * Creates the official playlist for a station
  546. *
  547. * @param {object} payload - object that contains the payload
  548. * @param {string} payload.stationId - the id of the station
  549. * @param {Array} payload.songList - list of songs to put in official playlist
  550. * @returns {Promise} - returns a promise (resolve, reject)
  551. */
  552. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  553. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  554. console.log(typeof payload.songList, payload.songList);
  555. return new Promise(resolve => {
  556. const lessInfoPlaylist = [];
  557. return async.each(
  558. payload.songList,
  559. (song, next) => {
  560. SongsModule.runJob("GET_SONG", { id: song }, this)
  561. .then(response => {
  562. const { song } = response;
  563. if (song) {
  564. const newSong = {
  565. songId: song.songId,
  566. title: song.title,
  567. artists: song.artists,
  568. duration: song.duration
  569. };
  570. lessInfoPlaylist.push(newSong);
  571. }
  572. })
  573. .finally(() => {
  574. next();
  575. });
  576. },
  577. () => {
  578. CacheModule.runJob(
  579. "HSET",
  580. {
  581. table: "officialPlaylists",
  582. key: payload.stationId,
  583. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  584. },
  585. this
  586. ).finally(() => {
  587. CacheModule.runJob("PUB", {
  588. channel: "station.newOfficialPlaylist",
  589. value: payload.stationId
  590. });
  591. resolve();
  592. });
  593. }
  594. );
  595. });
  596. }
  597. /**
  598. * Skips a station
  599. *
  600. * @param {object} payload - object that contains the payload
  601. * @param {string} payload.stationId - the id of the station to skip
  602. * @returns {Promise} - returns a promise (resolve, reject)
  603. */
  604. SKIP_STATION(payload) {
  605. return new Promise((resolve, reject) => {
  606. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  607. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  608. async.waterfall(
  609. [
  610. next => {
  611. StationsModule.runJob(
  612. "GET_STATION",
  613. {
  614. stationId: payload.stationId
  615. },
  616. this
  617. )
  618. .then(station => {
  619. next(null, station);
  620. })
  621. .catch(() => {});
  622. },
  623. // eslint-disable-next-line consistent-return
  624. (station, next) => {
  625. if (!station) return next("Station not found.");
  626. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  627. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  628. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  629. // Community station with party mode enabled and songs in the queue
  630. if (station.paused) return next(null, null, -19, station);
  631. return StationsModule.stationModel.updateOne(
  632. { _id: payload.stationId },
  633. {
  634. $pull: {
  635. queue: {
  636. _id: station.queue[0]._id
  637. }
  638. }
  639. },
  640. err => {
  641. if (err) return next(err);
  642. return next(null, station.queue[0], -12, station);
  643. }
  644. );
  645. }
  646. if (station.type === "community" && !station.partyMode) {
  647. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  648. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  649. if (err) return next(err);
  650. if (!playlist) return next(null, null, -13, station);
  651. playlist = playlist.songs;
  652. if (playlist.length > 0) {
  653. let currentSongIndex;
  654. if (station.currentSongIndex < playlist.length - 1)
  655. currentSongIndex = station.currentSongIndex + 1;
  656. else currentSongIndex = 0;
  657. const callback = (err, song) => {
  658. if (err) return next(err);
  659. if (song) return next(null, song, currentSongIndex, station);
  660. const currentSong = {
  661. songId: playlist[currentSongIndex].songId,
  662. title: playlist[currentSongIndex].title,
  663. duration: playlist[currentSongIndex].duration,
  664. likes: -1,
  665. dislikes: -1
  666. };
  667. return next(null, currentSong, currentSongIndex, station);
  668. };
  669. if (playlist[currentSongIndex]._id)
  670. return SongsModule.runJob(
  671. "GET_SONG",
  672. {
  673. id: playlist[currentSongIndex]._id
  674. },
  675. this
  676. )
  677. .then(response => callback(null, response.song))
  678. .catch(callback);
  679. return SongsModule.runJob(
  680. "GET_SONG_FROM_ID",
  681. {
  682. songId: playlist[currentSongIndex].songId
  683. },
  684. this
  685. )
  686. .then(response => callback(null, response.song))
  687. .catch(callback);
  688. }
  689. return next(null, null, -14, station);
  690. })
  691. );
  692. }
  693. if (station.type === "official" && station.playlist.length === 0) {
  694. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  695. .then(playlist => {
  696. if (playlist.length === 0) return next(null, this.defaultSong, 0, station);
  697. return SongsModule.runJob(
  698. "GET_SONG",
  699. {
  700. id: playlist[0]
  701. },
  702. this
  703. )
  704. .then(response => {
  705. next(null, response.song, 0, station);
  706. })
  707. .catch(() => next(null, this.defaultSong, 0, station));
  708. })
  709. .catch(next);
  710. }
  711. if (station.type === "official" && station.playlist.length > 0) {
  712. return async.doUntil(
  713. next => {
  714. if (station.currentSongIndex < station.playlist.length - 1) {
  715. SongsModule.runJob(
  716. "GET_SONG",
  717. {
  718. id: station.playlist[station.currentSongIndex + 1]
  719. },
  720. this
  721. )
  722. .then(response => next(null, response.song, station.currentSongIndex + 1))
  723. .catch(() => {
  724. station.currentSongIndex += 1;
  725. next(null, null, null);
  726. });
  727. } else {
  728. StationsModule.runJob(
  729. "CALCULATE_SONG_FOR_STATION",
  730. {
  731. station
  732. },
  733. this
  734. )
  735. .then(newPlaylist => {
  736. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  737. .then(response => {
  738. station.playlist = newPlaylist;
  739. next(null, response.song, 0);
  740. })
  741. .catch(() => next(null, this.defaultSong, 0));
  742. })
  743. .catch(() => {
  744. next(null, this.defaultSong, 0);
  745. });
  746. }
  747. },
  748. (song, currentSongIndex, next) => {
  749. if (song) return next(null, true, currentSongIndex);
  750. return next(null, false);
  751. },
  752. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  753. );
  754. }
  755. },
  756. (song, currentSongIndex, station, next) => {
  757. const $set = {};
  758. if (song === null) $set.currentSong = null;
  759. else if (song.likes === -1 && song.dislikes === -1) {
  760. $set.currentSong = {
  761. songId: song.songId,
  762. title: song.title,
  763. duration: song.duration,
  764. skipDuration: 0,
  765. likes: -1,
  766. dislikes: -1
  767. };
  768. } else {
  769. $set.currentSong = {
  770. songId: song.songId,
  771. title: song.title,
  772. artists: song.artists,
  773. duration: song.duration,
  774. likes: song.likes,
  775. dislikes: song.dislikes,
  776. skipDuration: song.skipDuration,
  777. thumbnail: song.thumbnail
  778. };
  779. }
  780. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  781. $set.startedAt = Date.now();
  782. $set.timePaused = 0;
  783. if (station.paused) $set.pausedAt = Date.now();
  784. next(null, $set, station);
  785. },
  786. ($set, station, next) => {
  787. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  788. StationsModule.runJob(
  789. "UPDATE_STATION",
  790. {
  791. stationId: station._id
  792. },
  793. this
  794. )
  795. .then(station => {
  796. if (station.type === "community" && station.partyMode === true)
  797. CacheModule.runJob("PUB", {
  798. channel: "station.queueUpdate",
  799. value: payload.stationId
  800. })
  801. .then()
  802. .catch();
  803. next(null, station);
  804. })
  805. .catch(next);
  806. });
  807. }
  808. ],
  809. async (err, station) => {
  810. if (err) {
  811. err = await UtilsModule.runJob(
  812. "GET_ERROR",
  813. {
  814. error: err
  815. },
  816. this
  817. );
  818. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  819. reject(new Error(err));
  820. } else {
  821. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  822. station.currentSong.skipVotes = 0;
  823. }
  824. // TODO Pub/Sub this
  825. UtilsModule.runJob("EMIT_TO_ROOM", {
  826. room: `station.${station._id}`,
  827. args: [
  828. "event:songs.next",
  829. {
  830. currentSong: station.currentSong,
  831. startedAt: station.startedAt,
  832. paused: station.paused,
  833. timePaused: 0
  834. }
  835. ]
  836. })
  837. .then()
  838. .catch();
  839. if (station.privacy === "public") {
  840. UtilsModule.runJob("EMIT_TO_ROOM", {
  841. room: "home",
  842. args: ["event:station.nextSong", station._id, station.currentSong]
  843. })
  844. .then()
  845. .catch();
  846. } else {
  847. const sockets = await UtilsModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  848. for (
  849. let socketId = 0, socketKeys = Object.keys(sockets);
  850. socketId < socketKeys.length;
  851. socketId += 1
  852. ) {
  853. const socket = sockets[socketId];
  854. const { session } = sockets[socketId];
  855. if (session.sessionId) {
  856. CacheModule.runJob(
  857. "HGET",
  858. {
  859. table: "sessions",
  860. key: session.sessionId
  861. },
  862. this
  863. // eslint-disable-next-line no-loop-func
  864. ).then(session => {
  865. if (session) {
  866. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  867. userModel => {
  868. userModel.findOne(
  869. {
  870. _id: session.userId
  871. },
  872. (err, user) => {
  873. if (!err && user) {
  874. if (user.role === "admin")
  875. socket.emit(
  876. "event:station.nextSong",
  877. station._id,
  878. station.currentSong
  879. );
  880. else if (
  881. station.type === "community" &&
  882. station.owner === session.userId
  883. )
  884. socket.emit(
  885. "event:station.nextSong",
  886. station._id,
  887. station.currentSong
  888. );
  889. }
  890. }
  891. );
  892. }
  893. );
  894. }
  895. });
  896. }
  897. }
  898. }
  899. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  900. UtilsModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  901. sockets: await UtilsModule.runJob(
  902. "GET_ROOM_SOCKETS",
  903. {
  904. room: `station.${station._id}`
  905. },
  906. this
  907. ),
  908. room: `song.${station.currentSong.songId}`
  909. });
  910. if (!station.paused) {
  911. NotificationsModule.runJob("SCHEDULE", {
  912. name: `stations.nextSong?id=${station._id}`,
  913. time: station.currentSong.duration * 1000,
  914. station
  915. });
  916. }
  917. } else {
  918. UtilsModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  919. sockets: await UtilsModule.runJob(
  920. "GET_ROOM_SOCKETS",
  921. {
  922. room: `station.${station._id}`
  923. },
  924. this
  925. )
  926. })
  927. .then()
  928. .catch();
  929. }
  930. resolve({ station });
  931. }
  932. }
  933. );
  934. });
  935. }
  936. /**
  937. * Checks if a user can view/access a station
  938. *
  939. * @param {object} payload - object that contains the payload
  940. * @param {object} payload.station - the station object of the station in question
  941. * @param {string} payload.userId - the id of the user in question
  942. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  943. * @returns {Promise} - returns a promise (resolve, reject)
  944. */
  945. CAN_USER_VIEW_STATION(payload) {
  946. return new Promise((resolve, reject) => {
  947. async.waterfall(
  948. [
  949. next => {
  950. if (payload.station.privacy === "public") return next(true);
  951. if (payload.station.privacy === "unlisted")
  952. if (payload.hideUnlisted === true) return next();
  953. else return next(true);
  954. if (!payload.userId) return next("Not allowed");
  955. return next();
  956. },
  957. next => {
  958. DBModule.runJob(
  959. "GET_MODEL",
  960. {
  961. modelName: "user"
  962. },
  963. this
  964. ).then(userModel => {
  965. userModel.findOne({ _id: payload.userId }, next);
  966. });
  967. },
  968. (user, next) => {
  969. if (!user) return next("Not allowed");
  970. if (user.role === "admin") return next(true);
  971. if (payload.station.type === "official") return next("Not allowed");
  972. if (payload.station.owner === payload.userId) return next(true);
  973. return next("Not allowed");
  974. }
  975. ],
  976. async errOrResult => {
  977. if (errOrResult !== true && errOrResult !== "Not allowed") {
  978. errOrResult = await UtilsModule.runJob(
  979. "GET_ERROR",
  980. {
  981. error: errOrResult
  982. },
  983. this
  984. );
  985. reject(new Error(errOrResult));
  986. } else {
  987. resolve(errOrResult === true);
  988. }
  989. }
  990. );
  991. });
  992. }
  993. }
  994. export default new _StationsModule();