stations.js 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let SongsModule;
  8. let NotificationsModule;
  9. class _StationsModule extends CoreClass {
  10. // eslint-disable-next-line require-jsdoc
  11. constructor() {
  12. super("stations");
  13. StationsModule = this;
  14. }
  15. /**
  16. * Initialises the stations module
  17. *
  18. * @returns {Promise} - returns promise (reject, resolve)
  19. */
  20. async initialize() {
  21. CacheModule = this.moduleManager.modules.cache;
  22. DBModule = this.moduleManager.modules.db;
  23. UtilsModule = this.moduleManager.modules.utils;
  24. SongsModule = this.moduleManager.modules.songs;
  25. NotificationsModule = this.moduleManager.modules.notifications;
  26. this.defaultSong = {
  27. songId: "60ItHLz5WEA",
  28. title: "Faded - Alan Walker",
  29. duration: 212,
  30. skipDuration: 0,
  31. likes: -1,
  32. dislikes: -1
  33. };
  34. // TEMP
  35. CacheModule.runJob("SUB", {
  36. channel: "station.pause",
  37. cb: async stationId => {
  38. NotificationsModule.runJob("REMOVE", {
  39. subscription: `stations.nextSong?id=${stationId}`
  40. }).then();
  41. }
  42. });
  43. CacheModule.runJob("SUB", {
  44. channel: "station.resume",
  45. cb: async stationId => {
  46. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  47. }
  48. });
  49. CacheModule.runJob("SUB", {
  50. channel: "station.queueUpdate",
  51. cb: async stationId => {
  52. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  53. if (!station.currentSong && station.queue.length > 0) {
  54. StationsModule.runJob("INITIALIZE_STATION", {
  55. stationId
  56. }).then();
  57. }
  58. });
  59. }
  60. });
  61. CacheModule.runJob("SUB", {
  62. channel: "station.newOfficialPlaylist",
  63. cb: async stationId => {
  64. CacheModule.runJob("HGET", {
  65. table: "officialPlaylists",
  66. key: stationId
  67. }).then(playlistObj => {
  68. if (playlistObj) {
  69. UtilsModule.runJob("EMIT_TO_ROOM", {
  70. room: `station.${stationId}`,
  71. args: ["event:newOfficialPlaylist", playlistObj.songs]
  72. });
  73. }
  74. });
  75. }
  76. });
  77. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  78. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  79. return new Promise((resolve, reject) =>
  80. async.waterfall(
  81. [
  82. next => {
  83. this.setStage(2);
  84. CacheModule.runJob("HGETALL", { table: "stations" })
  85. .then(stations => {
  86. next(null, stations);
  87. })
  88. .catch(next);
  89. },
  90. (stations, next) => {
  91. this.setStage(3);
  92. if (!stations) return next();
  93. const stationIds = Object.keys(stations);
  94. return async.each(
  95. stationIds,
  96. (stationId, next) => {
  97. stationModel.findOne({ _id: stationId }, (err, station) => {
  98. if (err) next(err);
  99. else if (!station) {
  100. CacheModule.runJob("HDEL", {
  101. table: "stations",
  102. key: stationId
  103. })
  104. .then(() => {
  105. next();
  106. })
  107. .catch(next);
  108. } else next();
  109. });
  110. },
  111. next
  112. );
  113. },
  114. next => {
  115. this.setStage(4);
  116. stationModel.find({}, next);
  117. },
  118. (stations, next) => {
  119. this.setStage(5);
  120. async.each(
  121. stations,
  122. (station, next2) => {
  123. async.waterfall(
  124. [
  125. next => {
  126. CacheModule.runJob("HSET", {
  127. table: "stations",
  128. key: station._id,
  129. value: stationSchema(station)
  130. })
  131. .then(station => next(null, station))
  132. .catch(next);
  133. },
  134. (station, next) => {
  135. StationsModule.runJob(
  136. "INITIALIZE_STATION",
  137. {
  138. stationId: station._id
  139. },
  140. null,
  141. -1
  142. )
  143. .then(() => {
  144. next();
  145. })
  146. .catch(next);
  147. }
  148. ],
  149. err => {
  150. next2(err);
  151. }
  152. );
  153. },
  154. next
  155. );
  156. }
  157. ],
  158. async err => {
  159. if (err) {
  160. err = await UtilsModule.runJob("GET_ERROR", {
  161. error: err
  162. });
  163. reject(new Error(err));
  164. } else {
  165. resolve();
  166. }
  167. }
  168. )
  169. );
  170. }
  171. /**
  172. * Initialises a station
  173. *
  174. * @param {object} payload - object that contains the payload
  175. * @param {string} payload.stationId - id of the station to initialise
  176. * @returns {Promise} - returns a promise (resolve, reject)
  177. */
  178. INITIALIZE_STATION(payload) {
  179. return new Promise((resolve, reject) => {
  180. // if (typeof cb !== 'function') cb = ()=>{};
  181. async.waterfall(
  182. [
  183. next => {
  184. StationsModule.runJob(
  185. "GET_STATION",
  186. {
  187. stationId: payload.stationId
  188. },
  189. this
  190. )
  191. .then(station => {
  192. next(null, station);
  193. })
  194. .catch(next);
  195. },
  196. (station, next) => {
  197. if (!station) return next("Station not found.");
  198. NotificationsModule.runJob("UNSCHEDULE", {
  199. name: `stations.nextSong?id=${station._id}`
  200. })
  201. .then()
  202. .catch()
  203. .finally(() => {
  204. NotificationsModule.runJob(
  205. "SUBSCRIBE",
  206. {
  207. name: `stations.nextSong?id=${station._id}`,
  208. cb: () =>
  209. StationsModule.runJob("SKIP_STATION", {
  210. stationId: station._id
  211. }),
  212. unique: true,
  213. station
  214. },
  215. this
  216. )
  217. .then()
  218. .catch();
  219. if (station.paused) return next(true, station);
  220. return next(null, station);
  221. });
  222. },
  223. (station, next) => {
  224. if (!station.currentSong) {
  225. return StationsModule.runJob(
  226. "SKIP_STATION",
  227. {
  228. stationId: station._id
  229. },
  230. this
  231. )
  232. .then(station => {
  233. next(true, station);
  234. })
  235. .catch(next)
  236. .finally(() => {});
  237. }
  238. let timeLeft =
  239. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  240. if (Number.isNaN(timeLeft)) timeLeft = -1;
  241. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  242. return StationsModule.runJob(
  243. "SKIP_STATION",
  244. {
  245. stationId: station._id
  246. },
  247. this
  248. )
  249. .then(station => {
  250. next(null, station);
  251. })
  252. .catch(next);
  253. }
  254. // name, time, cb, station
  255. NotificationsModule.runJob("SCHEDULE", {
  256. name: `stations.nextSong?id=${station._id}`,
  257. time: timeLeft,
  258. station
  259. });
  260. return next(null, station);
  261. }
  262. ],
  263. async (err, station) => {
  264. if (err && err !== true) {
  265. err = await UtilsModule.runJob(
  266. "GET_ERROR",
  267. {
  268. error: err
  269. },
  270. this
  271. );
  272. reject(new Error(err));
  273. } else resolve(station);
  274. }
  275. );
  276. });
  277. }
  278. /**
  279. * Calculates the next song for the station
  280. *
  281. * @param {object} payload - object that contains the payload
  282. * @param {string} payload.station - station object to calculate song for
  283. * @returns {Promise} - returns a promise (resolve, reject)
  284. */
  285. async CALCULATE_SONG_FOR_STATION(payload) {
  286. // station, bypassValidate = false
  287. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  288. return new Promise((resolve, reject) => {
  289. const songList = [];
  290. return async.waterfall(
  291. [
  292. next => {
  293. if (payload.station.genres.length === 0) return next();
  294. const genresDone = [];
  295. return payload.station.genres.forEach(genre => {
  296. songModel.find({ genres: genre }, (err, songs) => {
  297. if (!err) {
  298. songs.forEach(song => {
  299. if (songList.indexOf(song._id) === -1) {
  300. let found = false;
  301. song.genres.forEach(songGenre => {
  302. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  303. found = true;
  304. });
  305. if (!found) {
  306. songList.push(song._id);
  307. }
  308. }
  309. });
  310. }
  311. genresDone.push(genre);
  312. if (genresDone.length === payload.station.genres.length) next();
  313. });
  314. });
  315. },
  316. next => {
  317. const playlist = [];
  318. songList.forEach(songId => {
  319. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  320. });
  321. // eslint-disable-next-line array-callback-return
  322. payload.station.playlist.filter(songId => {
  323. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  324. });
  325. UtilsModule.runJob("SHUFFLE", { array: playlist })
  326. .then(result => {
  327. next(null, result.array);
  328. }, this)
  329. .catch(next);
  330. },
  331. (playlist, next) => {
  332. StationsModule.runJob(
  333. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  334. {
  335. stationId: payload.station._id,
  336. songList: playlist
  337. },
  338. this
  339. )
  340. .then(() => {
  341. next(null, playlist);
  342. })
  343. .catch(next);
  344. },
  345. (playlist, next) => {
  346. StationsModule.stationModel.updateOne(
  347. { _id: payload.station._id },
  348. { $set: { playlist } },
  349. { runValidators: true },
  350. () => {
  351. StationsModule.runJob(
  352. "UPDATE_STATION",
  353. {
  354. stationId: payload.station._id
  355. },
  356. this
  357. )
  358. .then(() => {
  359. next(null, playlist);
  360. })
  361. .catch(next);
  362. }
  363. );
  364. }
  365. ],
  366. (err, newPlaylist) => {
  367. if (err) return reject(new Error(err));
  368. return resolve(newPlaylist);
  369. }
  370. );
  371. });
  372. }
  373. /**
  374. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  375. *
  376. * @param {object} payload - object that contains the payload
  377. * @param {string} payload.stationId - id of the station
  378. * @returns {Promise} - returns a promise (resolve, reject)
  379. */
  380. GET_STATION(payload) {
  381. return new Promise((resolve, reject) => {
  382. async.waterfall(
  383. [
  384. next => {
  385. CacheModule.runJob(
  386. "HGET",
  387. {
  388. table: "stations",
  389. key: payload.stationId
  390. },
  391. this
  392. )
  393. .then(station => {
  394. next(null, station);
  395. })
  396. .catch(next);
  397. },
  398. (station, next) => {
  399. if (station) return next(true, station);
  400. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  401. },
  402. (station, next) => {
  403. if (station) {
  404. if (station.type === "official") {
  405. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  406. stationId: station._id,
  407. songList: station.playlist
  408. })
  409. .then()
  410. .catch();
  411. }
  412. station = StationsModule.stationSchema(station);
  413. CacheModule.runJob("HSET", {
  414. table: "stations",
  415. key: payload.stationId,
  416. value: station
  417. })
  418. .then()
  419. .catch();
  420. next(true, station);
  421. } else next("Station not found");
  422. }
  423. ],
  424. async (err, station) => {
  425. if (err && err !== true) {
  426. err = await UtilsModule.runJob(
  427. "GET_ERROR",
  428. {
  429. error: err
  430. },
  431. this
  432. );
  433. reject(new Error(err));
  434. } else resolve(station);
  435. }
  436. );
  437. });
  438. }
  439. /**
  440. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  441. *
  442. * @param {object} payload - object that contains the payload
  443. * @param {string} payload.stationName - the unique name of the station
  444. * @returns {Promise} - returns a promise (resolve, reject)
  445. */
  446. async GET_STATION_BY_NAME(payload) {
  447. return new Promise((resolve, reject) =>
  448. async.waterfall(
  449. [
  450. next => {
  451. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  452. },
  453. (station, next) => {
  454. if (station) {
  455. if (station.type === "official") {
  456. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  457. stationId: station._id,
  458. songList: station.playlist
  459. });
  460. }
  461. station = StationsModule.stationSchema(station);
  462. CacheModule.runJob("HSET", {
  463. table: "stations",
  464. key: station._id,
  465. value: station
  466. });
  467. next(true, station);
  468. } else next("Station not found");
  469. }
  470. ],
  471. (err, station) => {
  472. if (err && err !== true) return reject(new Error(err));
  473. return resolve(station);
  474. }
  475. )
  476. );
  477. }
  478. /**
  479. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  480. *
  481. * @param {object} payload - object that contains the payload
  482. * @param {string} payload.stationId - the id of the station to update
  483. * @returns {Promise} - returns a promise (resolve, reject)
  484. */
  485. UPDATE_STATION(payload) {
  486. return new Promise((resolve, reject) => {
  487. async.waterfall(
  488. [
  489. next => {
  490. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  491. },
  492. (station, next) => {
  493. if (!station) {
  494. CacheModule.runJob("HDEL", {
  495. table: "stations",
  496. key: payload.stationId
  497. })
  498. .then()
  499. .catch();
  500. return next("Station not found");
  501. }
  502. return CacheModule.runJob(
  503. "HSET",
  504. {
  505. table: "stations",
  506. key: payload.stationId,
  507. value: station
  508. },
  509. this
  510. )
  511. .then(station => {
  512. next(null, station);
  513. })
  514. .catch(next);
  515. }
  516. ],
  517. async (err, station) => {
  518. if (err && err !== true) {
  519. err = await UtilsModule.runJob(
  520. "GET_ERROR",
  521. {
  522. error: err
  523. },
  524. this
  525. );
  526. reject(new Error(err));
  527. } else resolve(station);
  528. }
  529. );
  530. });
  531. }
  532. /**
  533. * Creates the official playlist for a station
  534. *
  535. * @param {object} payload - object that contains the payload
  536. * @param {string} payload.stationId - the id of the station
  537. * @param {Array} payload.songList - list of songs to put in official playlist
  538. * @returns {Promise} - returns a promise (resolve, reject)
  539. */
  540. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  541. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  542. console.log(typeof payload.songList, payload.songList);
  543. return new Promise(resolve => {
  544. const lessInfoPlaylist = [];
  545. return async.each(
  546. payload.songList,
  547. (song, next) => {
  548. SongsModule.runJob("GET_SONG", { id: song }, this)
  549. .then(response => {
  550. const { song } = response;
  551. if (song) {
  552. const newSong = {
  553. songId: song.songId,
  554. title: song.title,
  555. artists: song.artists,
  556. duration: song.duration
  557. };
  558. lessInfoPlaylist.push(newSong);
  559. }
  560. })
  561. .finally(() => {
  562. next();
  563. });
  564. },
  565. () => {
  566. CacheModule.runJob(
  567. "HSET",
  568. {
  569. table: "officialPlaylists",
  570. key: payload.stationId,
  571. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  572. },
  573. this
  574. ).finally(() => {
  575. CacheModule.runJob("PUB", {
  576. channel: "station.newOfficialPlaylist",
  577. value: payload.stationId
  578. });
  579. resolve();
  580. });
  581. }
  582. );
  583. });
  584. }
  585. /**
  586. * Skips a station
  587. *
  588. * @param {object} payload - object that contains the payload
  589. * @param {string} payload.stationId - the id of the station to skip
  590. * @returns {Promise} - returns a promise (resolve, reject)
  591. */
  592. SKIP_STATION(payload) {
  593. return new Promise((resolve, reject) => {
  594. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  595. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  596. async.waterfall(
  597. [
  598. next => {
  599. StationsModule.runJob(
  600. "GET_STATION",
  601. {
  602. stationId: payload.stationId
  603. },
  604. this
  605. )
  606. .then(station => {
  607. next(null, station);
  608. })
  609. .catch(() => {});
  610. },
  611. // eslint-disable-next-line consistent-return
  612. (station, next) => {
  613. if (!station) return next("Station not found.");
  614. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  615. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  616. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  617. // Community station with party mode enabled and songs in the queue
  618. if (station.paused) return next(null, null, -19, station);
  619. return StationsModule.stationModel.updateOne(
  620. { _id: payload.stationId },
  621. {
  622. $pull: {
  623. queue: {
  624. _id: station.queue[0]._id
  625. }
  626. }
  627. },
  628. err => {
  629. if (err) return next(err);
  630. return next(null, station.queue[0], -12, station);
  631. }
  632. );
  633. }
  634. if (station.type === "community" && !station.partyMode) {
  635. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  636. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  637. if (err) return next(err);
  638. if (!playlist) return next(null, null, -13, station);
  639. playlist = playlist.songs;
  640. if (playlist.length > 0) {
  641. let currentSongIndex;
  642. if (station.currentSongIndex < playlist.length - 1)
  643. currentSongIndex = station.currentSongIndex + 1;
  644. else currentSongIndex = 0;
  645. const callback = (err, song) => {
  646. if (err) return next(err);
  647. if (song) return next(null, song, currentSongIndex, station);
  648. const currentSong = {
  649. songId: playlist[currentSongIndex].songId,
  650. title: playlist[currentSongIndex].title,
  651. duration: playlist[currentSongIndex].duration,
  652. likes: -1,
  653. dislikes: -1
  654. };
  655. return next(null, currentSong, currentSongIndex, station);
  656. };
  657. if (playlist[currentSongIndex]._id)
  658. return SongsModule.runJob(
  659. "GET_SONG",
  660. {
  661. id: playlist[currentSongIndex]._id
  662. },
  663. this
  664. )
  665. .then(response => callback(null, response.song))
  666. .catch(callback);
  667. return SongsModule.runJob(
  668. "GET_SONG_FROM_ID",
  669. {
  670. songId: playlist[currentSongIndex].songId
  671. },
  672. this
  673. )
  674. .then(response => callback(null, response.song))
  675. .catch(callback);
  676. }
  677. return next(null, null, -14, station);
  678. })
  679. );
  680. }
  681. if (station.type === "official" && station.playlist.length === 0) {
  682. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  683. .then(playlist => {
  684. if (playlist.length === 0)
  685. return next(null, StationsModule.defaultSong, 0, station);
  686. return SongsModule.runJob(
  687. "GET_SONG",
  688. {
  689. id: playlist[0]
  690. },
  691. this
  692. )
  693. .then(response => {
  694. next(null, response.song, 0, station);
  695. })
  696. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  697. })
  698. .catch(err => {
  699. next(err);
  700. });
  701. }
  702. if (station.type === "official" && station.playlist.length > 0) {
  703. return async.doUntil(
  704. next => {
  705. if (station.currentSongIndex < station.playlist.length - 1) {
  706. SongsModule.runJob(
  707. "GET_SONG",
  708. {
  709. id: station.playlist[station.currentSongIndex + 1]
  710. },
  711. this
  712. )
  713. .then(response => next(null, response.song, station.currentSongIndex + 1))
  714. .catch(() => {
  715. station.currentSongIndex += 1;
  716. next(null, null, null);
  717. });
  718. } else {
  719. StationsModule.runJob(
  720. "CALCULATE_SONG_FOR_STATION",
  721. {
  722. station
  723. },
  724. this
  725. )
  726. .then(newPlaylist => {
  727. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  728. .then(response => {
  729. station.playlist = newPlaylist;
  730. next(null, response.song, 0);
  731. })
  732. .catch(() => next(null, StationsModule.defaultSong, 0));
  733. })
  734. .catch(() => {
  735. next(null, StationsModule.defaultSong, 0);
  736. });
  737. }
  738. },
  739. (song, currentSongIndex, next) => {
  740. if (song) return next(null, true, currentSongIndex);
  741. return next(null, false);
  742. },
  743. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  744. );
  745. }
  746. },
  747. (song, currentSongIndex, station, next) => {
  748. const $set = {};
  749. if (song === null) $set.currentSong = null;
  750. else if (song.likes === -1 && song.dislikes === -1) {
  751. $set.currentSong = {
  752. songId: song.songId,
  753. title: song.title,
  754. duration: song.duration,
  755. skipDuration: 0,
  756. likes: -1,
  757. dislikes: -1
  758. };
  759. } else {
  760. $set.currentSong = {
  761. songId: song.songId,
  762. title: song.title,
  763. artists: song.artists,
  764. duration: song.duration,
  765. likes: song.likes,
  766. dislikes: song.dislikes,
  767. skipDuration: song.skipDuration,
  768. thumbnail: song.thumbnail
  769. };
  770. }
  771. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  772. $set.startedAt = Date.now();
  773. $set.timePaused = 0;
  774. if (station.paused) $set.pausedAt = Date.now();
  775. next(null, $set, station);
  776. },
  777. ($set, station, next) => {
  778. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  779. StationsModule.runJob(
  780. "UPDATE_STATION",
  781. {
  782. stationId: station._id
  783. },
  784. this
  785. )
  786. .then(station => {
  787. if (station.type === "community" && station.partyMode === true)
  788. CacheModule.runJob("PUB", {
  789. channel: "station.queueUpdate",
  790. value: payload.stationId
  791. })
  792. .then()
  793. .catch();
  794. next(null, station);
  795. })
  796. .catch(next);
  797. });
  798. }
  799. ],
  800. async (err, station) => {
  801. if (err) {
  802. err = await UtilsModule.runJob(
  803. "GET_ERROR",
  804. {
  805. error: err
  806. },
  807. this
  808. );
  809. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  810. reject(new Error(err));
  811. } else {
  812. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  813. station.currentSong.skipVotes = 0;
  814. }
  815. // TODO Pub/Sub this
  816. UtilsModule.runJob("EMIT_TO_ROOM", {
  817. room: `station.${station._id}`,
  818. args: [
  819. "event:songs.next",
  820. {
  821. currentSong: station.currentSong,
  822. startedAt: station.startedAt,
  823. paused: station.paused,
  824. timePaused: 0
  825. }
  826. ]
  827. })
  828. .then()
  829. .catch();
  830. if (station.privacy === "public") {
  831. UtilsModule.runJob("EMIT_TO_ROOM", {
  832. room: "home",
  833. args: ["event:station.nextSong", station._id, station.currentSong]
  834. })
  835. .then()
  836. .catch();
  837. } else {
  838. const sockets = await UtilsModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  839. Object.keys(sockets).forEach(socketKey => {
  840. const socket = sockets[socketKey];
  841. const { session } = socket;
  842. if (session.sessionId) {
  843. CacheModule.runJob(
  844. "HGET",
  845. {
  846. table: "sessions",
  847. key: session.sessionId
  848. },
  849. this
  850. // eslint-disable-next-line no-loop-func
  851. ).then(session => {
  852. if (session) {
  853. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  854. userModel => {
  855. userModel.findOne(
  856. {
  857. _id: session.userId
  858. },
  859. (err, user) => {
  860. if (!err && user) {
  861. if (user.role === "admin")
  862. socket.emit(
  863. "event:station.nextSong",
  864. station._id,
  865. station.currentSong
  866. );
  867. else if (
  868. station.type === "community" &&
  869. station.owner === session.userId
  870. )
  871. socket.emit(
  872. "event:station.nextSong",
  873. station._id,
  874. station.currentSong
  875. );
  876. }
  877. }
  878. );
  879. }
  880. );
  881. }
  882. });
  883. }
  884. });
  885. }
  886. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  887. UtilsModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  888. sockets: await UtilsModule.runJob(
  889. "GET_ROOM_SOCKETS",
  890. {
  891. room: `station.${station._id}`
  892. },
  893. this
  894. ),
  895. room: `song.${station.currentSong.songId}`
  896. });
  897. if (!station.paused) {
  898. NotificationsModule.runJob("SCHEDULE", {
  899. name: `stations.nextSong?id=${station._id}`,
  900. time: station.currentSong.duration * 1000,
  901. station
  902. });
  903. }
  904. } else {
  905. UtilsModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  906. sockets: await UtilsModule.runJob(
  907. "GET_ROOM_SOCKETS",
  908. {
  909. room: `station.${station._id}`
  910. },
  911. this
  912. )
  913. })
  914. .then()
  915. .catch();
  916. }
  917. resolve({ station });
  918. }
  919. }
  920. );
  921. });
  922. }
  923. /**
  924. * Checks if a user can view/access a station
  925. *
  926. * @param {object} payload - object that contains the payload
  927. * @param {object} payload.station - the station object of the station in question
  928. * @param {string} payload.userId - the id of the user in question
  929. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  930. * @returns {Promise} - returns a promise (resolve, reject)
  931. */
  932. CAN_USER_VIEW_STATION(payload) {
  933. return new Promise((resolve, reject) => {
  934. async.waterfall(
  935. [
  936. next => {
  937. if (payload.station.privacy === "public") return next(true);
  938. if (payload.station.privacy === "unlisted")
  939. if (payload.hideUnlisted === true) return next();
  940. else return next(true);
  941. if (!payload.userId) return next("Not allowed");
  942. return next();
  943. },
  944. next => {
  945. DBModule.runJob(
  946. "GET_MODEL",
  947. {
  948. modelName: "user"
  949. },
  950. this
  951. ).then(userModel => {
  952. userModel.findOne({ _id: payload.userId }, next);
  953. });
  954. },
  955. (user, next) => {
  956. if (!user) return next("Not allowed");
  957. if (user.role === "admin") return next(true);
  958. if (payload.station.type === "official") return next("Not allowed");
  959. if (payload.station.owner === payload.userId) return next(true);
  960. return next("Not allowed");
  961. }
  962. ],
  963. async errOrResult => {
  964. if (errOrResult !== true && errOrResult !== "Not allowed") {
  965. errOrResult = await UtilsModule.runJob(
  966. "GET_ERROR",
  967. {
  968. error: errOrResult
  969. },
  970. this
  971. );
  972. reject(new Error(errOrResult));
  973. } else {
  974. resolve(errOrResult === true);
  975. }
  976. }
  977. );
  978. });
  979. }
  980. }
  981. export default new _StationsModule();