stations.js 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1
  35. };
  36. // TEMP
  37. CacheModule.runJob("SUB", {
  38. channel: "station.pause",
  39. cb: async stationId => {
  40. NotificationsModule.runJob("REMOVE", {
  41. subscription: `stations.nextSong?id=${stationId}`
  42. }).then();
  43. }
  44. });
  45. CacheModule.runJob("SUB", {
  46. channel: "station.resume",
  47. cb: async stationId => {
  48. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  49. }
  50. });
  51. CacheModule.runJob("SUB", {
  52. channel: "station.queueUpdate",
  53. cb: async stationId => {
  54. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  55. if (!station.currentSong && station.queue.length > 0) {
  56. StationsModule.runJob("INITIALIZE_STATION", {
  57. stationId
  58. }).then();
  59. }
  60. });
  61. }
  62. });
  63. CacheModule.runJob("SUB", {
  64. channel: "station.newOfficialPlaylist",
  65. cb: async stationId => {
  66. CacheModule.runJob("HGET", {
  67. table: "officialPlaylists",
  68. key: stationId
  69. }).then(playlistObj => {
  70. if (playlistObj) {
  71. IOModule.runJob("EMIT_TO_ROOM", {
  72. room: `station.${stationId}`,
  73. args: ["event:newOfficialPlaylist", playlistObj.songs]
  74. });
  75. }
  76. });
  77. }
  78. });
  79. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  80. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  81. return new Promise((resolve, reject) =>
  82. async.waterfall(
  83. [
  84. next => {
  85. this.setStage(2);
  86. CacheModule.runJob("HGETALL", { table: "stations" })
  87. .then(stations => {
  88. next(null, stations);
  89. })
  90. .catch(next);
  91. },
  92. (stations, next) => {
  93. this.setStage(3);
  94. if (!stations) return next();
  95. const stationIds = Object.keys(stations);
  96. return async.each(
  97. stationIds,
  98. (stationId, next) => {
  99. stationModel.findOne({ _id: stationId }, (err, station) => {
  100. if (err) next(err);
  101. else if (!station) {
  102. CacheModule.runJob("HDEL", {
  103. table: "stations",
  104. key: stationId
  105. })
  106. .then(() => {
  107. next();
  108. })
  109. .catch(next);
  110. } else next();
  111. });
  112. },
  113. next
  114. );
  115. },
  116. next => {
  117. this.setStage(4);
  118. stationModel.find({}, next);
  119. },
  120. (stations, next) => {
  121. this.setStage(5);
  122. async.each(
  123. stations,
  124. (station, next2) => {
  125. async.waterfall(
  126. [
  127. next => {
  128. CacheModule.runJob("HSET", {
  129. table: "stations",
  130. key: station._id,
  131. value: stationSchema(station)
  132. })
  133. .then(station => next(null, station))
  134. .catch(next);
  135. },
  136. (station, next) => {
  137. StationsModule.runJob(
  138. "INITIALIZE_STATION",
  139. {
  140. stationId: station._id
  141. },
  142. null,
  143. -1
  144. )
  145. .then(() => {
  146. next();
  147. })
  148. .catch(next);
  149. }
  150. ],
  151. err => {
  152. next2(err);
  153. }
  154. );
  155. },
  156. next
  157. );
  158. }
  159. ],
  160. async err => {
  161. if (err) {
  162. err = await UtilsModule.runJob("GET_ERROR", {
  163. error: err
  164. });
  165. reject(new Error(err));
  166. } else {
  167. resolve();
  168. }
  169. }
  170. )
  171. );
  172. }
  173. /**
  174. * Initialises a station
  175. *
  176. * @param {object} payload - object that contains the payload
  177. * @param {string} payload.stationId - id of the station to initialise
  178. * @returns {Promise} - returns a promise (resolve, reject)
  179. */
  180. INITIALIZE_STATION(payload) {
  181. return new Promise((resolve, reject) => {
  182. // if (typeof cb !== 'function') cb = ()=>{};
  183. async.waterfall(
  184. [
  185. next => {
  186. StationsModule.runJob(
  187. "GET_STATION",
  188. {
  189. stationId: payload.stationId
  190. },
  191. this
  192. )
  193. .then(station => {
  194. next(null, station);
  195. })
  196. .catch(next);
  197. },
  198. (station, next) => {
  199. if (!station) return next("Station not found.");
  200. return NotificationsModule.runJob(
  201. "UNSCHEDULE",
  202. {
  203. name: `stations.nextSong?id=${station._id}`
  204. },
  205. this
  206. )
  207. .then()
  208. .catch()
  209. .finally(() => {
  210. NotificationsModule.runJob(
  211. "SUBSCRIBE",
  212. {
  213. name: `stations.nextSong?id=${station._id}`,
  214. cb: () =>
  215. StationsModule.runJob("SKIP_STATION", {
  216. stationId: station._id
  217. }),
  218. unique: true,
  219. station
  220. },
  221. this
  222. )
  223. .then()
  224. .catch();
  225. if (station.paused) return next(true, station);
  226. return next(null, station);
  227. });
  228. },
  229. (station, next) => {
  230. if (!station.currentSong) {
  231. return StationsModule.runJob(
  232. "SKIP_STATION",
  233. {
  234. stationId: station._id
  235. },
  236. this
  237. )
  238. .then(station => {
  239. next(true, station);
  240. })
  241. .catch(next)
  242. .finally(() => {});
  243. }
  244. let timeLeft =
  245. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  246. if (Number.isNaN(timeLeft)) timeLeft = -1;
  247. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  248. return StationsModule.runJob(
  249. "SKIP_STATION",
  250. {
  251. stationId: station._id
  252. },
  253. this
  254. )
  255. .then(station => {
  256. next(null, station);
  257. })
  258. .catch(next);
  259. }
  260. // name, time, cb, station
  261. NotificationsModule.runJob("SCHEDULE", {
  262. name: `stations.nextSong?id=${station._id}`,
  263. time: timeLeft,
  264. station
  265. });
  266. return next(null, station);
  267. }
  268. ],
  269. async (err, station) => {
  270. if (err && err !== true) {
  271. err = await UtilsModule.runJob(
  272. "GET_ERROR",
  273. {
  274. error: err
  275. },
  276. this
  277. );
  278. reject(new Error(err));
  279. } else resolve(station);
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Calculates the next song for the station
  286. *
  287. * @param {object} payload - object that contains the payload
  288. * @param {string} payload.station - station object to calculate song for
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. async CALCULATE_SONG_FOR_STATION(payload) {
  292. // station, bypassValidate = false
  293. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  294. return new Promise((resolve, reject) => {
  295. const songList = [];
  296. return async.waterfall(
  297. [
  298. next => {
  299. if (payload.station.genres.length === 0) return next();
  300. const genresDone = [];
  301. return payload.station.genres.forEach(genre => {
  302. songModel.find({ genres: genre }, (err, songs) => {
  303. if (!err) {
  304. songs.forEach(song => {
  305. if (songList.indexOf(song._id) === -1) {
  306. let found = false;
  307. song.genres.forEach(songGenre => {
  308. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  309. found = true;
  310. });
  311. if (!found) {
  312. songList.push(song._id);
  313. }
  314. }
  315. });
  316. }
  317. genresDone.push(genre);
  318. if (genresDone.length === payload.station.genres.length) next();
  319. });
  320. });
  321. },
  322. next => {
  323. const playlist = [];
  324. songList.forEach(songId => {
  325. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  326. });
  327. // eslint-disable-next-line array-callback-return
  328. payload.station.playlist.filter(songId => {
  329. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  330. });
  331. UtilsModule.runJob("SHUFFLE", { array: playlist })
  332. .then(result => {
  333. next(null, result.array);
  334. }, this)
  335. .catch(next);
  336. },
  337. (playlist, next) => {
  338. StationsModule.runJob(
  339. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  340. {
  341. stationId: payload.station._id,
  342. songList: playlist
  343. },
  344. this
  345. )
  346. .then(() => {
  347. next(null, playlist);
  348. })
  349. .catch(next);
  350. },
  351. (playlist, next) => {
  352. StationsModule.stationModel.updateOne(
  353. { _id: payload.station._id },
  354. { $set: { playlist } },
  355. { runValidators: true },
  356. () => {
  357. StationsModule.runJob(
  358. "UPDATE_STATION",
  359. {
  360. stationId: payload.station._id
  361. },
  362. this
  363. )
  364. .then(() => {
  365. next(null, playlist);
  366. })
  367. .catch(next);
  368. }
  369. );
  370. }
  371. ],
  372. (err, newPlaylist) => {
  373. if (err) return reject(new Error(err));
  374. return resolve(newPlaylist);
  375. }
  376. );
  377. });
  378. }
  379. /**
  380. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  381. *
  382. * @param {object} payload - object that contains the payload
  383. * @param {string} payload.stationId - id of the station
  384. * @returns {Promise} - returns a promise (resolve, reject)
  385. */
  386. GET_STATION(payload) {
  387. return new Promise((resolve, reject) => {
  388. async.waterfall(
  389. [
  390. next => {
  391. CacheModule.runJob(
  392. "HGET",
  393. {
  394. table: "stations",
  395. key: payload.stationId
  396. },
  397. this
  398. )
  399. .then(station => {
  400. next(null, station);
  401. })
  402. .catch(next);
  403. },
  404. (station, next) => {
  405. if (station) return next(true, station);
  406. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  407. },
  408. (station, next) => {
  409. if (station) {
  410. if (station.type === "official") {
  411. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  412. stationId: station._id,
  413. songList: station.playlist
  414. })
  415. .then()
  416. .catch();
  417. }
  418. station = StationsModule.stationSchema(station);
  419. CacheModule.runJob("HSET", {
  420. table: "stations",
  421. key: payload.stationId,
  422. value: station
  423. })
  424. .then()
  425. .catch();
  426. next(true, station);
  427. } else next("Station not found");
  428. }
  429. ],
  430. async (err, station) => {
  431. if (err && err !== true) {
  432. err = await UtilsModule.runJob(
  433. "GET_ERROR",
  434. {
  435. error: err
  436. },
  437. this
  438. );
  439. reject(new Error(err));
  440. } else resolve(station);
  441. }
  442. );
  443. });
  444. }
  445. /**
  446. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  447. *
  448. * @param {object} payload - object that contains the payload
  449. * @param {string} payload.stationName - the unique name of the station
  450. * @returns {Promise} - returns a promise (resolve, reject)
  451. */
  452. async GET_STATION_BY_NAME(payload) {
  453. return new Promise((resolve, reject) =>
  454. async.waterfall(
  455. [
  456. next => {
  457. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  458. },
  459. (station, next) => {
  460. if (station) {
  461. if (station.type === "official") {
  462. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  463. stationId: station._id,
  464. songList: station.playlist
  465. });
  466. }
  467. station = StationsModule.stationSchema(station);
  468. CacheModule.runJob("HSET", {
  469. table: "stations",
  470. key: station._id,
  471. value: station
  472. });
  473. next(true, station);
  474. } else next("Station not found");
  475. }
  476. ],
  477. (err, station) => {
  478. if (err && err !== true) return reject(new Error(err));
  479. return resolve(station);
  480. }
  481. )
  482. );
  483. }
  484. /**
  485. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  486. *
  487. * @param {object} payload - object that contains the payload
  488. * @param {string} payload.stationId - the id of the station to update
  489. * @returns {Promise} - returns a promise (resolve, reject)
  490. */
  491. UPDATE_STATION(payload) {
  492. return new Promise((resolve, reject) => {
  493. async.waterfall(
  494. [
  495. next => {
  496. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  497. },
  498. (station, next) => {
  499. if (!station) {
  500. CacheModule.runJob("HDEL", {
  501. table: "stations",
  502. key: payload.stationId
  503. })
  504. .then()
  505. .catch();
  506. return next("Station not found");
  507. }
  508. return CacheModule.runJob(
  509. "HSET",
  510. {
  511. table: "stations",
  512. key: payload.stationId,
  513. value: station
  514. },
  515. this
  516. )
  517. .then(station => {
  518. next(null, station);
  519. })
  520. .catch(next);
  521. }
  522. ],
  523. async (err, station) => {
  524. if (err && err !== true) {
  525. err = await UtilsModule.runJob(
  526. "GET_ERROR",
  527. {
  528. error: err
  529. },
  530. this
  531. );
  532. reject(new Error(err));
  533. } else resolve(station);
  534. }
  535. );
  536. });
  537. }
  538. /**
  539. * Creates the official playlist for a station
  540. *
  541. * @param {object} payload - object that contains the payload
  542. * @param {string} payload.stationId - the id of the station
  543. * @param {Array} payload.songList - list of songs to put in official playlist
  544. * @returns {Promise} - returns a promise (resolve, reject)
  545. */
  546. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  547. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  548. console.log(typeof payload.songList, payload.songList);
  549. return new Promise(resolve => {
  550. const lessInfoPlaylist = [];
  551. return async.each(
  552. payload.songList,
  553. (song, next) => {
  554. SongsModule.runJob("GET_SONG", { id: song }, this)
  555. .then(response => {
  556. const { song } = response;
  557. if (song) {
  558. const newSong = {
  559. songId: song.songId,
  560. title: song.title,
  561. artists: song.artists,
  562. duration: song.duration
  563. };
  564. lessInfoPlaylist.push(newSong);
  565. }
  566. })
  567. .finally(() => {
  568. next();
  569. });
  570. },
  571. () => {
  572. CacheModule.runJob(
  573. "HSET",
  574. {
  575. table: "officialPlaylists",
  576. key: payload.stationId,
  577. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  578. },
  579. this
  580. ).finally(() => {
  581. CacheModule.runJob("PUB", {
  582. channel: "station.newOfficialPlaylist",
  583. value: payload.stationId
  584. });
  585. resolve();
  586. });
  587. }
  588. );
  589. });
  590. }
  591. /**
  592. * Skips a station
  593. *
  594. * @param {object} payload - object that contains the payload
  595. * @param {string} payload.stationId - the id of the station to skip
  596. * @returns {Promise} - returns a promise (resolve, reject)
  597. */
  598. SKIP_STATION(payload) {
  599. return new Promise((resolve, reject) => {
  600. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  601. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  602. async.waterfall(
  603. [
  604. next => {
  605. StationsModule.runJob(
  606. "GET_STATION",
  607. {
  608. stationId: payload.stationId
  609. },
  610. this
  611. )
  612. .then(station => {
  613. next(null, station);
  614. })
  615. .catch(() => {});
  616. },
  617. // eslint-disable-next-line consistent-return
  618. (station, next) => {
  619. if (!station) return next("Station not found.");
  620. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  621. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  622. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  623. // Community station with party mode enabled and songs in the queue
  624. if (station.paused) return next(null, null, -19, station);
  625. return StationsModule.stationModel.updateOne(
  626. { _id: payload.stationId },
  627. {
  628. $pull: {
  629. queue: {
  630. _id: station.queue[0]._id
  631. }
  632. }
  633. },
  634. err => {
  635. if (err) return next(err);
  636. return next(null, station.queue[0], -12, station);
  637. }
  638. );
  639. }
  640. if (station.type === "community" && !station.partyMode) {
  641. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  642. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  643. if (err) return next(err);
  644. if (!playlist) return next(null, null, -13, station);
  645. playlist = playlist.songs;
  646. if (playlist.length > 0) {
  647. let currentSongIndex;
  648. if (station.currentSongIndex < playlist.length - 1)
  649. currentSongIndex = station.currentSongIndex + 1;
  650. else currentSongIndex = 0;
  651. const callback = (err, song) => {
  652. if (err) return next(err);
  653. if (song) return next(null, song, currentSongIndex, station);
  654. const currentSong = {
  655. songId: playlist[currentSongIndex].songId,
  656. title: playlist[currentSongIndex].title,
  657. duration: playlist[currentSongIndex].duration,
  658. likes: -1,
  659. dislikes: -1
  660. };
  661. return next(null, currentSong, currentSongIndex, station);
  662. };
  663. if (playlist[currentSongIndex]._id)
  664. return SongsModule.runJob(
  665. "GET_SONG",
  666. {
  667. id: playlist[currentSongIndex]._id
  668. },
  669. this
  670. )
  671. .then(response => callback(null, response.song))
  672. .catch(callback);
  673. return SongsModule.runJob(
  674. "GET_SONG_FROM_ID",
  675. {
  676. songId: playlist[currentSongIndex].songId
  677. },
  678. this
  679. )
  680. .then(response => callback(null, response.song))
  681. .catch(callback);
  682. }
  683. return next(null, null, -14, station);
  684. })
  685. );
  686. }
  687. if (station.type === "official" && station.playlist.length === 0) {
  688. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  689. .then(playlist => {
  690. if (playlist.length === 0)
  691. return next(null, StationsModule.defaultSong, 0, station);
  692. return SongsModule.runJob(
  693. "GET_SONG",
  694. {
  695. id: playlist[0]
  696. },
  697. this
  698. )
  699. .then(response => {
  700. next(null, response.song, 0, station);
  701. })
  702. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  703. })
  704. .catch(err => {
  705. next(err);
  706. });
  707. }
  708. if (station.type === "official" && station.playlist.length > 0) {
  709. return async.doUntil(
  710. next => {
  711. if (station.currentSongIndex < station.playlist.length - 1) {
  712. SongsModule.runJob(
  713. "GET_SONG",
  714. {
  715. id: station.playlist[station.currentSongIndex + 1]
  716. },
  717. this
  718. )
  719. .then(response => next(null, response.song, station.currentSongIndex + 1))
  720. .catch(() => {
  721. station.currentSongIndex += 1;
  722. next(null, null, null);
  723. });
  724. } else {
  725. StationsModule.runJob(
  726. "CALCULATE_SONG_FOR_STATION",
  727. {
  728. station
  729. },
  730. this
  731. )
  732. .then(newPlaylist => {
  733. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  734. .then(response => {
  735. station.playlist = newPlaylist;
  736. next(null, response.song, 0);
  737. })
  738. .catch(() => next(null, StationsModule.defaultSong, 0));
  739. })
  740. .catch(() => {
  741. next(null, StationsModule.defaultSong, 0);
  742. });
  743. }
  744. },
  745. (song, currentSongIndex, next) => {
  746. if (song) return next(null, true, currentSongIndex);
  747. return next(null, false);
  748. },
  749. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  750. );
  751. }
  752. },
  753. (song, currentSongIndex, station, next) => {
  754. const $set = {};
  755. if (song === null) $set.currentSong = null;
  756. else if (song.likes === -1 && song.dislikes === -1) {
  757. $set.currentSong = {
  758. songId: song.songId,
  759. title: song.title,
  760. duration: song.duration,
  761. skipDuration: 0,
  762. likes: -1,
  763. dislikes: -1
  764. };
  765. } else {
  766. $set.currentSong = {
  767. songId: song.songId,
  768. title: song.title,
  769. artists: song.artists,
  770. duration: song.duration,
  771. likes: song.likes,
  772. dislikes: song.dislikes,
  773. skipDuration: song.skipDuration,
  774. thumbnail: song.thumbnail
  775. };
  776. }
  777. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  778. $set.startedAt = Date.now();
  779. $set.timePaused = 0;
  780. if (station.paused) $set.pausedAt = Date.now();
  781. next(null, $set, station);
  782. },
  783. ($set, station, next) => {
  784. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  785. StationsModule.runJob(
  786. "UPDATE_STATION",
  787. {
  788. stationId: station._id
  789. },
  790. this
  791. )
  792. .then(station => {
  793. if (station.type === "community" && station.partyMode === true)
  794. CacheModule.runJob("PUB", {
  795. channel: "station.queueUpdate",
  796. value: payload.stationId
  797. })
  798. .then()
  799. .catch();
  800. next(null, station);
  801. })
  802. .catch(next);
  803. });
  804. }
  805. ],
  806. async (err, station) => {
  807. if (err) {
  808. err = await UtilsModule.runJob(
  809. "GET_ERROR",
  810. {
  811. error: err
  812. },
  813. this
  814. );
  815. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  816. reject(new Error(err));
  817. } else {
  818. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  819. station.currentSong.skipVotes = 0;
  820. }
  821. // TODO Pub/Sub this
  822. IOModule.runJob("EMIT_TO_ROOM", {
  823. room: `station.${station._id}`,
  824. args: [
  825. "event:songs.next",
  826. {
  827. currentSong: station.currentSong,
  828. startedAt: station.startedAt,
  829. paused: station.paused,
  830. timePaused: 0
  831. }
  832. ]
  833. })
  834. .then()
  835. .catch();
  836. if (station.privacy === "public") {
  837. IOModule.runJob("EMIT_TO_ROOM", {
  838. room: "home",
  839. args: ["event:station.nextSong", station._id, station.currentSong]
  840. })
  841. .then()
  842. .catch();
  843. } else {
  844. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  845. Object.keys(sockets).forEach(socketKey => {
  846. const socket = sockets[socketKey];
  847. const { session } = socket;
  848. if (session.sessionId) {
  849. CacheModule.runJob(
  850. "HGET",
  851. {
  852. table: "sessions",
  853. key: session.sessionId
  854. },
  855. this
  856. // eslint-disable-next-line no-loop-func
  857. ).then(session => {
  858. if (session) {
  859. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  860. userModel => {
  861. userModel.findOne(
  862. {
  863. _id: session.userId
  864. },
  865. (err, user) => {
  866. if (!err && user) {
  867. if (user.role === "admin")
  868. socket.emit(
  869. "event:station.nextSong",
  870. station._id,
  871. station.currentSong
  872. );
  873. else if (
  874. station.type === "community" &&
  875. station.owner === session.userId
  876. )
  877. socket.emit(
  878. "event:station.nextSong",
  879. station._id,
  880. station.currentSong
  881. );
  882. }
  883. }
  884. );
  885. }
  886. );
  887. }
  888. });
  889. }
  890. });
  891. }
  892. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  893. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  894. sockets: await IOModule.runJob(
  895. "GET_ROOM_SOCKETS",
  896. {
  897. room: `station.${station._id}`
  898. },
  899. this
  900. ),
  901. room: `song.${station.currentSong.songId}`
  902. });
  903. if (!station.paused) {
  904. NotificationsModule.runJob("SCHEDULE", {
  905. name: `stations.nextSong?id=${station._id}`,
  906. time: station.currentSong.duration * 1000,
  907. station
  908. });
  909. }
  910. } else {
  911. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  912. sockets: await IOModule.runJob(
  913. "GET_ROOM_SOCKETS",
  914. {
  915. room: `station.${station._id}`
  916. },
  917. this
  918. )
  919. })
  920. .then()
  921. .catch();
  922. }
  923. resolve({ station });
  924. }
  925. }
  926. );
  927. });
  928. }
  929. /**
  930. * Checks if a user can view/access a station
  931. *
  932. * @param {object} payload - object that contains the payload
  933. * @param {object} payload.station - the station object of the station in question
  934. * @param {string} payload.userId - the id of the user in question
  935. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  936. * @returns {Promise} - returns a promise (resolve, reject)
  937. */
  938. CAN_USER_VIEW_STATION(payload) {
  939. return new Promise((resolve, reject) => {
  940. async.waterfall(
  941. [
  942. next => {
  943. if (payload.station.privacy === "public") return next(true);
  944. if (payload.station.privacy === "unlisted")
  945. if (payload.hideUnlisted === true) return next();
  946. else return next(true);
  947. if (!payload.userId) return next("Not allowed");
  948. return next();
  949. },
  950. next => {
  951. DBModule.runJob(
  952. "GET_MODEL",
  953. {
  954. modelName: "user"
  955. },
  956. this
  957. ).then(userModel => {
  958. userModel.findOne({ _id: payload.userId }, next);
  959. });
  960. },
  961. (user, next) => {
  962. if (!user) return next("Not allowed");
  963. if (user.role === "admin") return next(true);
  964. if (payload.station.type === "official") return next("Not allowed");
  965. if (payload.station.owner === payload.userId) return next(true);
  966. return next("Not allowed");
  967. }
  968. ],
  969. async errOrResult => {
  970. if (errOrResult !== true && errOrResult !== "Not allowed") {
  971. errOrResult = await UtilsModule.runJob(
  972. "GET_ERROR",
  973. {
  974. error: errOrResult
  975. },
  976. this
  977. );
  978. reject(new Error(errOrResult));
  979. } else {
  980. resolve(errOrResult === true);
  981. }
  982. }
  983. );
  984. });
  985. }
  986. }
  987. export default new _StationsModule();