stations.js 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1,
  35. requestedAt: Date.now()
  36. };
  37. this.userList = {};
  38. this.usersPerStation = {};
  39. this.usersPerStationCount = {};
  40. // TEMP
  41. CacheModule.runJob("SUB", {
  42. channel: "station.pause",
  43. cb: async stationId => {
  44. NotificationsModule.runJob("REMOVE", {
  45. subscription: `stations.nextSong?id=${stationId}`
  46. }).then();
  47. }
  48. });
  49. CacheModule.runJob("SUB", {
  50. channel: "station.resume",
  51. cb: async stationId => {
  52. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  53. }
  54. });
  55. CacheModule.runJob("SUB", {
  56. channel: "station.queueUpdate",
  57. cb: async stationId => {
  58. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  59. if (!station.currentSong && station.queue.length > 0) {
  60. StationsModule.runJob("INITIALIZE_STATION", {
  61. stationId
  62. }).then();
  63. }
  64. });
  65. }
  66. });
  67. CacheModule.runJob("SUB", {
  68. channel: "station.newOfficialPlaylist",
  69. cb: async stationId => {
  70. CacheModule.runJob("HGET", {
  71. table: "officialPlaylists",
  72. key: stationId
  73. }).then(playlistObj => {
  74. if (playlistObj) {
  75. IOModule.runJob("EMIT_TO_ROOM", {
  76. room: `station.${stationId}`,
  77. args: ["event:newOfficialPlaylist", playlistObj.songs]
  78. });
  79. }
  80. });
  81. }
  82. });
  83. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  84. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  85. return new Promise((resolve, reject) =>
  86. async.waterfall(
  87. [
  88. next => {
  89. this.setStage(2);
  90. CacheModule.runJob("HGETALL", { table: "stations" })
  91. .then(stations => {
  92. next(null, stations);
  93. })
  94. .catch(next);
  95. },
  96. (stations, next) => {
  97. this.setStage(3);
  98. if (!stations) return next();
  99. const stationIds = Object.keys(stations);
  100. return async.each(
  101. stationIds,
  102. (stationId, next) => {
  103. stationModel.findOne({ _id: stationId }, (err, station) => {
  104. if (err) next(err);
  105. else if (!station) {
  106. CacheModule.runJob("HDEL", {
  107. table: "stations",
  108. key: stationId
  109. })
  110. .then(() => {
  111. next();
  112. })
  113. .catch(next);
  114. } else next();
  115. });
  116. },
  117. next
  118. );
  119. },
  120. next => {
  121. this.setStage(4);
  122. stationModel.find({}, next);
  123. },
  124. (stations, next) => {
  125. this.setStage(5);
  126. async.each(
  127. stations,
  128. (station, next2) => {
  129. async.waterfall(
  130. [
  131. next => {
  132. CacheModule.runJob("HSET", {
  133. table: "stations",
  134. key: station._id,
  135. value: stationSchema(station)
  136. })
  137. .then(station => next(null, station))
  138. .catch(next);
  139. },
  140. (station, next) => {
  141. StationsModule.runJob(
  142. "INITIALIZE_STATION",
  143. {
  144. stationId: station._id
  145. },
  146. null,
  147. -1
  148. )
  149. .then(() => {
  150. next();
  151. })
  152. .catch(next);
  153. }
  154. ],
  155. err => {
  156. next2(err);
  157. }
  158. );
  159. },
  160. next
  161. );
  162. }
  163. ],
  164. async err => {
  165. if (err) {
  166. err = await UtilsModule.runJob("GET_ERROR", {
  167. error: err
  168. });
  169. reject(new Error(err));
  170. } else {
  171. resolve();
  172. }
  173. }
  174. )
  175. );
  176. }
  177. /**
  178. * Initialises a station
  179. *
  180. * @param {object} payload - object that contains the payload
  181. * @param {string} payload.stationId - id of the station to initialise
  182. * @returns {Promise} - returns a promise (resolve, reject)
  183. */
  184. INITIALIZE_STATION(payload) {
  185. return new Promise((resolve, reject) => {
  186. // if (typeof cb !== 'function') cb = ()=>{};
  187. async.waterfall(
  188. [
  189. next => {
  190. StationsModule.runJob(
  191. "GET_STATION",
  192. {
  193. stationId: payload.stationId
  194. },
  195. this
  196. )
  197. .then(station => {
  198. next(null, station);
  199. })
  200. .catch(next);
  201. },
  202. (station, next) => {
  203. if (!station) return next("Station not found.");
  204. return NotificationsModule.runJob(
  205. "UNSCHEDULE",
  206. {
  207. name: `stations.nextSong?id=${station._id}`
  208. },
  209. this
  210. )
  211. .then()
  212. .catch()
  213. .finally(() => {
  214. NotificationsModule.runJob("SUBSCRIBE", {
  215. name: `stations.nextSong?id=${station._id}`,
  216. cb: () =>
  217. StationsModule.runJob("SKIP_STATION", {
  218. stationId: station._id
  219. }),
  220. unique: true,
  221. station
  222. })
  223. .then()
  224. .catch();
  225. if (station.paused) return next(true, station);
  226. return next(null, station);
  227. });
  228. },
  229. (station, next) => {
  230. if (!station.currentSong) {
  231. return StationsModule.runJob(
  232. "SKIP_STATION",
  233. {
  234. stationId: station._id
  235. },
  236. this
  237. )
  238. .then(station => {
  239. next(true, station);
  240. })
  241. .catch(next)
  242. .finally(() => {});
  243. }
  244. let timeLeft =
  245. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  246. if (Number.isNaN(timeLeft)) timeLeft = -1;
  247. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  248. return StationsModule.runJob(
  249. "SKIP_STATION",
  250. {
  251. stationId: station._id
  252. },
  253. this
  254. )
  255. .then(station => {
  256. next(null, station);
  257. })
  258. .catch(next);
  259. }
  260. // name, time, cb, station
  261. NotificationsModule.runJob("SCHEDULE", {
  262. name: `stations.nextSong?id=${station._id}`,
  263. time: timeLeft,
  264. station
  265. });
  266. return next(null, station);
  267. }
  268. ],
  269. async (err, station) => {
  270. if (err && err !== true) {
  271. err = await UtilsModule.runJob(
  272. "GET_ERROR",
  273. {
  274. error: err
  275. },
  276. this
  277. );
  278. reject(new Error(err));
  279. } else resolve(station);
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Calculates the next song for the station
  286. *
  287. * @param {object} payload - object that contains the payload
  288. * @param {string} payload.station - station object to calculate song for
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. async CALCULATE_SONG_FOR_STATION(payload) {
  292. // station, bypassValidate = false
  293. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  294. return new Promise((resolve, reject) => {
  295. const songList = [];
  296. return async.waterfall(
  297. [
  298. next => {
  299. if (payload.station.genres.length === 0) return next();
  300. const genresDone = [];
  301. const blacklistedGenres = payload.station.blacklistedGenres.map(blacklistedGenre =>
  302. blacklistedGenre.toLowerCase()
  303. );
  304. return payload.station.genres.forEach(genre => {
  305. songModel.find({ genres: { $regex: genre, $options: "i" } }, (err, songs) => {
  306. if (!err) {
  307. songs.forEach(song => {
  308. if (songList.indexOf(song._id) === -1) {
  309. let found = false;
  310. song.genres.forEach(songGenre => {
  311. if (blacklistedGenres.indexOf(songGenre.toLowerCase()) !== -1)
  312. found = true;
  313. });
  314. if (!found) {
  315. songList.push(song._id);
  316. }
  317. }
  318. });
  319. }
  320. genresDone.push(genre);
  321. if (genresDone.length === payload.station.genres.length) next();
  322. });
  323. });
  324. },
  325. next => {
  326. const playlist = [];
  327. songList.forEach(songId => {
  328. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  329. });
  330. // eslint-disable-next-line array-callback-return
  331. payload.station.playlist.filter(songId => {
  332. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  333. });
  334. UtilsModule.runJob("SHUFFLE", { array: playlist })
  335. .then(result => {
  336. next(null, result.array);
  337. }, this)
  338. .catch(next);
  339. },
  340. (playlist, next) => {
  341. StationsModule.runJob(
  342. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  343. {
  344. stationId: payload.station._id,
  345. songList: playlist
  346. },
  347. this
  348. )
  349. .then(() => {
  350. next(null, playlist);
  351. })
  352. .catch(next);
  353. },
  354. (playlist, next) => {
  355. StationsModule.stationModel.updateOne(
  356. { _id: payload.station._id },
  357. { $set: { playlist } },
  358. { runValidators: true },
  359. () => {
  360. StationsModule.runJob(
  361. "UPDATE_STATION",
  362. {
  363. stationId: payload.station._id
  364. },
  365. this
  366. )
  367. .then(() => {
  368. next(null, playlist);
  369. })
  370. .catch(next);
  371. }
  372. );
  373. }
  374. ],
  375. (err, newPlaylist) => {
  376. if (err) return reject(new Error(err));
  377. return resolve(newPlaylist);
  378. }
  379. );
  380. });
  381. }
  382. /**
  383. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  384. *
  385. * @param {object} payload - object that contains the payload
  386. * @param {string} payload.stationId - id of the station
  387. * @returns {Promise} - returns a promise (resolve, reject)
  388. */
  389. GET_STATION(payload) {
  390. return new Promise((resolve, reject) => {
  391. async.waterfall(
  392. [
  393. next => {
  394. CacheModule.runJob(
  395. "HGET",
  396. {
  397. table: "stations",
  398. key: payload.stationId
  399. },
  400. this
  401. )
  402. .then(station => {
  403. next(null, station);
  404. })
  405. .catch(next);
  406. },
  407. (station, next) => {
  408. if (station) return next(true, station);
  409. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  410. },
  411. (station, next) => {
  412. if (station) {
  413. if (station.type === "official") {
  414. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  415. stationId: station._id,
  416. songList: station.playlist
  417. })
  418. .then()
  419. .catch();
  420. }
  421. station = StationsModule.stationSchema(station);
  422. CacheModule.runJob("HSET", {
  423. table: "stations",
  424. key: payload.stationId,
  425. value: station
  426. })
  427. .then()
  428. .catch();
  429. next(true, station);
  430. } else next("Station not found");
  431. }
  432. ],
  433. async (err, station) => {
  434. if (err && err !== true) {
  435. err = await UtilsModule.runJob(
  436. "GET_ERROR",
  437. {
  438. error: err
  439. },
  440. this
  441. );
  442. reject(new Error(err));
  443. } else resolve(station);
  444. }
  445. );
  446. });
  447. }
  448. /**
  449. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  450. *
  451. * @param {object} payload - object that contains the payload
  452. * @param {string} payload.stationName - the unique name of the station
  453. * @returns {Promise} - returns a promise (resolve, reject)
  454. */
  455. async GET_STATION_BY_NAME(payload) {
  456. return new Promise((resolve, reject) =>
  457. async.waterfall(
  458. [
  459. next => {
  460. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  461. },
  462. (station, next) => {
  463. if (station) {
  464. if (station.type === "official") {
  465. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  466. stationId: station._id,
  467. songList: station.playlist
  468. });
  469. }
  470. station = StationsModule.stationSchema(station);
  471. CacheModule.runJob("HSET", {
  472. table: "stations",
  473. key: station._id,
  474. value: station
  475. });
  476. next(true, station);
  477. } else next("Station not found");
  478. }
  479. ],
  480. (err, station) => {
  481. if (err && err !== true) return reject(new Error(err));
  482. return resolve(station);
  483. }
  484. )
  485. );
  486. }
  487. /**
  488. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  489. *
  490. * @param {object} payload - object that contains the payload
  491. * @param {string} payload.stationId - the id of the station to update
  492. * @returns {Promise} - returns a promise (resolve, reject)
  493. */
  494. UPDATE_STATION(payload) {
  495. return new Promise((resolve, reject) => {
  496. async.waterfall(
  497. [
  498. next => {
  499. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  500. },
  501. (station, next) => {
  502. if (!station) {
  503. CacheModule.runJob("HDEL", {
  504. table: "stations",
  505. key: payload.stationId
  506. })
  507. .then()
  508. .catch();
  509. return next("Station not found");
  510. }
  511. return CacheModule.runJob(
  512. "HSET",
  513. {
  514. table: "stations",
  515. key: payload.stationId,
  516. value: station
  517. },
  518. this
  519. )
  520. .then(station => {
  521. next(null, station);
  522. })
  523. .catch(next);
  524. }
  525. ],
  526. async (err, station) => {
  527. if (err && err !== true) {
  528. err = await UtilsModule.runJob(
  529. "GET_ERROR",
  530. {
  531. error: err
  532. },
  533. this
  534. );
  535. reject(new Error(err));
  536. } else resolve(station);
  537. }
  538. );
  539. });
  540. }
  541. /**
  542. * Creates the official playlist for a station
  543. *
  544. * @param {object} payload - object that contains the payload
  545. * @param {string} payload.stationId - the id of the station
  546. * @param {Array} payload.songList - list of songs to put in official playlist
  547. * @returns {Promise} - returns a promise (resolve, reject)
  548. */
  549. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  550. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  551. console.log(typeof payload.songList, payload.songList);
  552. return new Promise(resolve => {
  553. const lessInfoPlaylist = [];
  554. return async.each(
  555. payload.songList,
  556. (song, next) => {
  557. SongsModule.runJob("GET_SONG", { id: song }, this)
  558. .then(response => {
  559. const { song } = response;
  560. if (song) {
  561. const newSong = {
  562. songId: song.songId,
  563. title: song.title,
  564. artists: song.artists,
  565. duration: song.duration,
  566. thumbnail: song.thumbnail,
  567. requestedAt: song.requestedAt
  568. };
  569. lessInfoPlaylist.push(newSong);
  570. }
  571. })
  572. .finally(() => {
  573. next();
  574. });
  575. },
  576. () => {
  577. CacheModule.runJob(
  578. "HSET",
  579. {
  580. table: "officialPlaylists",
  581. key: payload.stationId,
  582. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  583. },
  584. this
  585. ).finally(() => {
  586. CacheModule.runJob("PUB", {
  587. channel: "station.newOfficialPlaylist",
  588. value: payload.stationId
  589. });
  590. resolve();
  591. });
  592. }
  593. );
  594. });
  595. }
  596. /**
  597. * Skips a station
  598. *
  599. * @param {object} payload - object that contains the payload
  600. * @param {string} payload.stationId - the id of the station to skip
  601. * @returns {Promise} - returns a promise (resolve, reject)
  602. */
  603. SKIP_STATION(payload) {
  604. return new Promise((resolve, reject) => {
  605. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  606. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  607. async.waterfall(
  608. [
  609. next => {
  610. NotificationsModule.runJob("UNSCHEDULE", {
  611. name: `stations.nextSong?id=${payload.stationId}`
  612. })
  613. .then(() => {
  614. next();
  615. })
  616. .catch(next);
  617. },
  618. next => {
  619. StationsModule.runJob(
  620. "GET_STATION",
  621. {
  622. stationId: payload.stationId
  623. },
  624. this
  625. )
  626. .then(station => {
  627. next(null, station);
  628. })
  629. .catch(() => {});
  630. },
  631. // eslint-disable-next-line consistent-return
  632. (station, next) => {
  633. if (!station) return next("Station not found.");
  634. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  635. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  636. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  637. // Community station with party mode enabled and songs in the queue
  638. if (station.paused) return next(null, null, -19, station);
  639. return StationsModule.stationModel.updateOne(
  640. { _id: payload.stationId },
  641. {
  642. $pull: {
  643. queue: {
  644. _id: station.queue[0]._id
  645. }
  646. }
  647. },
  648. err => {
  649. if (err) return next(err);
  650. return next(null, station.queue[0], -12, station);
  651. }
  652. );
  653. }
  654. if (station.type === "community" && !station.partyMode) {
  655. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  656. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  657. if (err) return next(err);
  658. if (!playlist) return next(null, null, -13, station);
  659. playlist = playlist.songs;
  660. if (playlist.length > 0) {
  661. let currentSongIndex;
  662. if (station.currentSongIndex < playlist.length - 1)
  663. currentSongIndex = station.currentSongIndex + 1;
  664. else currentSongIndex = 0;
  665. const callback = (err, song) => {
  666. if (err) return next(err);
  667. if (song) return next(null, song, currentSongIndex, station);
  668. const currentSong = {
  669. songId: playlist[currentSongIndex].songId,
  670. title: playlist[currentSongIndex].title,
  671. duration: playlist[currentSongIndex].duration,
  672. likes: -1,
  673. dislikes: -1,
  674. requestedAt: playlist[currentSongIndex].requestedAt
  675. };
  676. return next(null, currentSong, currentSongIndex, station);
  677. };
  678. if (playlist[currentSongIndex]._id)
  679. return SongsModule.runJob(
  680. "GET_SONG",
  681. {
  682. id: playlist[currentSongIndex]._id
  683. },
  684. this
  685. )
  686. .then(response => callback(null, response.song))
  687. .catch(callback);
  688. return SongsModule.runJob(
  689. "GET_SONG_FROM_ID",
  690. {
  691. songId: playlist[currentSongIndex].songId
  692. },
  693. this
  694. )
  695. .then(response => callback(null, response.song))
  696. .catch(callback);
  697. }
  698. return next(null, null, -14, station);
  699. })
  700. );
  701. }
  702. if (station.type === "official" && station.playlist.length === 0) {
  703. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  704. .then(playlist => {
  705. if (playlist.length === 0)
  706. return next(null, StationsModule.defaultSong, 0, station);
  707. return SongsModule.runJob(
  708. "GET_SONG",
  709. {
  710. id: playlist[0]
  711. },
  712. this
  713. )
  714. .then(response => {
  715. next(null, response.song, 0, station);
  716. })
  717. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  718. })
  719. .catch(err => {
  720. next(err);
  721. });
  722. }
  723. if (station.type === "official" && station.playlist.length > 0) {
  724. return async.doUntil(
  725. next => {
  726. if (station.currentSongIndex < station.playlist.length - 1) {
  727. SongsModule.runJob(
  728. "GET_SONG",
  729. {
  730. id: station.playlist[station.currentSongIndex + 1]
  731. },
  732. this
  733. )
  734. .then(response => next(null, response.song, station.currentSongIndex + 1))
  735. .catch(() => {
  736. station.currentSongIndex += 1;
  737. next(null, null, null);
  738. });
  739. } else {
  740. StationsModule.runJob(
  741. "CALCULATE_SONG_FOR_STATION",
  742. {
  743. station
  744. },
  745. this
  746. )
  747. .then(newPlaylist => {
  748. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  749. .then(response => {
  750. station.playlist = newPlaylist;
  751. next(null, response.song, 0);
  752. })
  753. .catch(() => next(null, StationsModule.defaultSong, 0));
  754. })
  755. .catch(() => {
  756. next(null, StationsModule.defaultSong, 0);
  757. });
  758. }
  759. },
  760. (song, currentSongIndex, next) => {
  761. if (song) return next(null, true, currentSongIndex);
  762. return next(null, false);
  763. },
  764. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  765. );
  766. }
  767. },
  768. (song, currentSongIndex, station, next) => {
  769. const $set = {};
  770. if (song === null) $set.currentSong = null;
  771. else if (song.likes === -1 && song.dislikes === -1) {
  772. $set.currentSong = {
  773. songId: song.songId,
  774. title: song.title,
  775. duration: song.duration,
  776. skipDuration: 0,
  777. likes: -1,
  778. dislikes: -1,
  779. requestedAt: song.requestedAt
  780. };
  781. } else {
  782. $set.currentSong = {
  783. songId: song.songId,
  784. title: song.title,
  785. artists: song.artists,
  786. duration: song.duration,
  787. likes: song.likes,
  788. dislikes: song.dislikes,
  789. skipDuration: song.skipDuration,
  790. thumbnail: song.thumbnail,
  791. requestedAt: song.requestedAt
  792. };
  793. }
  794. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  795. $set.startedAt = Date.now();
  796. $set.timePaused = 0;
  797. if (station.paused) $set.pausedAt = Date.now();
  798. next(null, $set, station);
  799. },
  800. ($set, station, next) => {
  801. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, err => {
  802. if (err) return next(err);
  803. return StationsModule.runJob(
  804. "UPDATE_STATION",
  805. {
  806. stationId: station._id
  807. },
  808. this
  809. )
  810. .then(station => {
  811. if (station.type === "community" && station.partyMode === true)
  812. CacheModule.runJob("PUB", {
  813. channel: "station.queueUpdate",
  814. value: payload.stationId
  815. })
  816. .then()
  817. .catch();
  818. next(null, station);
  819. })
  820. .catch(next);
  821. });
  822. }
  823. ],
  824. async (err, station) => {
  825. if (err) {
  826. err = await UtilsModule.runJob(
  827. "GET_ERROR",
  828. {
  829. error: err
  830. },
  831. this
  832. );
  833. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  834. reject(new Error(err));
  835. } else {
  836. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  837. station.currentSong.skipVotes = 0;
  838. }
  839. // TODO Pub/Sub this
  840. IOModule.runJob("EMIT_TO_ROOM", {
  841. room: `station.${station._id}`,
  842. args: [
  843. "event:songs.next",
  844. {
  845. currentSong: station.currentSong,
  846. startedAt: station.startedAt,
  847. paused: station.paused,
  848. timePaused: 0
  849. }
  850. ]
  851. })
  852. .then()
  853. .catch();
  854. if (station.privacy === "public") {
  855. IOModule.runJob("EMIT_TO_ROOM", {
  856. room: "home",
  857. args: ["event:station.nextSong", station._id, station.currentSong]
  858. })
  859. .then()
  860. .catch();
  861. } else {
  862. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  863. Object.keys(sockets).forEach(socketKey => {
  864. const socket = sockets[socketKey];
  865. const { session } = socket;
  866. if (session.sessionId) {
  867. CacheModule.runJob(
  868. "HGET",
  869. {
  870. table: "sessions",
  871. key: session.sessionId
  872. },
  873. this
  874. // eslint-disable-next-line no-loop-func
  875. ).then(session => {
  876. if (session) {
  877. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  878. userModel => {
  879. userModel.findOne(
  880. {
  881. _id: session.userId
  882. },
  883. (err, user) => {
  884. if (!err && user) {
  885. if (user.role === "admin")
  886. socket.emit(
  887. "event:station.nextSong",
  888. station._id,
  889. station.currentSong
  890. );
  891. else if (
  892. station.type === "community" &&
  893. station.owner === session.userId
  894. )
  895. socket.emit(
  896. "event:station.nextSong",
  897. station._id,
  898. station.currentSong
  899. );
  900. }
  901. }
  902. );
  903. }
  904. );
  905. }
  906. });
  907. }
  908. });
  909. }
  910. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  911. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  912. sockets: await IOModule.runJob(
  913. "GET_ROOM_SOCKETS",
  914. {
  915. room: `station.${station._id}`
  916. },
  917. this
  918. ),
  919. room: `song.${station.currentSong.songId}`
  920. });
  921. if (!station.paused) {
  922. NotificationsModule.runJob("SCHEDULE", {
  923. name: `stations.nextSong?id=${station._id}`,
  924. time: station.currentSong.duration * 1000,
  925. station
  926. });
  927. }
  928. } else {
  929. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  930. sockets: await IOModule.runJob(
  931. "GET_ROOM_SOCKETS",
  932. {
  933. room: `station.${station._id}`
  934. },
  935. this
  936. )
  937. })
  938. .then()
  939. .catch();
  940. }
  941. resolve({ station });
  942. }
  943. }
  944. );
  945. });
  946. }
  947. /**
  948. * Checks if a user can view/access a station
  949. *
  950. * @param {object} payload - object that contains the payload
  951. * @param {object} payload.station - the station object of the station in question
  952. * @param {string} payload.userId - the id of the user in question
  953. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  954. * @returns {Promise} - returns a promise (resolve, reject)
  955. */
  956. CAN_USER_VIEW_STATION(payload) {
  957. return new Promise((resolve, reject) => {
  958. async.waterfall(
  959. [
  960. next => {
  961. if (payload.station.privacy === "public") return next(true);
  962. if (payload.station.privacy === "unlisted")
  963. if (payload.hideUnlisted === true) return next();
  964. else return next(true);
  965. if (!payload.userId) return next("Not allowed");
  966. return next();
  967. },
  968. next => {
  969. DBModule.runJob(
  970. "GET_MODEL",
  971. {
  972. modelName: "user"
  973. },
  974. this
  975. ).then(userModel => {
  976. userModel.findOne({ _id: payload.userId }, next);
  977. });
  978. },
  979. (user, next) => {
  980. if (!user) return next("Not allowed");
  981. if (user.role === "admin") return next(true);
  982. if (payload.station.type === "official") return next("Not allowed");
  983. if (payload.station.owner === payload.userId) return next(true);
  984. return next("Not allowed");
  985. }
  986. ],
  987. async errOrResult => {
  988. if (errOrResult !== true && errOrResult !== "Not allowed") {
  989. errOrResult = await UtilsModule.runJob(
  990. "GET_ERROR",
  991. {
  992. error: errOrResult
  993. },
  994. this
  995. );
  996. reject(new Error(errOrResult));
  997. } else {
  998. resolve(errOrResult === true);
  999. }
  1000. }
  1001. );
  1002. });
  1003. }
  1004. /**
  1005. * Returns a list of sockets in a room that can and can't know about a station
  1006. *
  1007. * @param {object} payload - the payload object
  1008. * @param {object} payload.station - the station object
  1009. * @param {string} payload.room - the socket.io room to get the sockets from
  1010. * @returns {Promise} - returns a promise (resolve, reject)
  1011. */
  1012. GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION(payload) {
  1013. return new Promise((resolve, reject) => {
  1014. IOModule.runJob("GET_ROOM_SOCKETS", { room: payload.room }, this)
  1015. .then(socketsObject => {
  1016. const sockets = Object.keys(socketsObject).map(socketKey => socketsObject[socketKey]);
  1017. let socketsThatCan = [];
  1018. const socketsThatCannot = [];
  1019. if (payload.station.privacy === "public") {
  1020. socketsThatCan = sockets;
  1021. resolve({ socketsThatCan, socketsThatCannot });
  1022. } else {
  1023. async.eachLimit(
  1024. sockets,
  1025. 1,
  1026. (socket, next) => {
  1027. const { session } = socket;
  1028. async.waterfall(
  1029. [
  1030. next => {
  1031. if (!session.sessionId) next("No session id");
  1032. else next();
  1033. },
  1034. next => {
  1035. CacheModule.runJob(
  1036. "HGET",
  1037. {
  1038. table: "sessions",
  1039. key: session.sessionId
  1040. },
  1041. this
  1042. )
  1043. .then(response => {
  1044. next(null, response);
  1045. })
  1046. .catch(next);
  1047. },
  1048. (session, next) => {
  1049. if (!session) next("No session");
  1050. else {
  1051. DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
  1052. .then(userModel => {
  1053. next(null, userModel);
  1054. })
  1055. .catch(next);
  1056. }
  1057. },
  1058. (userModel, next) => {
  1059. if (!userModel) next("No user model");
  1060. else
  1061. userModel.findOne(
  1062. {
  1063. _id: session.userId
  1064. },
  1065. next
  1066. );
  1067. },
  1068. (user, next) => {
  1069. if (!user) next("No user found");
  1070. else if (user.role === "admin") {
  1071. socketsThatCan.push(socket);
  1072. next();
  1073. } else if (
  1074. payload.station.type === "community" &&
  1075. payload.station.owner === session.userId
  1076. ) {
  1077. socketsThatCan.push(socket);
  1078. next();
  1079. }
  1080. }
  1081. ],
  1082. err => {
  1083. if (err) socketsThatCannot.push(socket);
  1084. next();
  1085. }
  1086. );
  1087. },
  1088. err => {
  1089. if (err) reject(err);
  1090. else resolve({ socketsThatCan, socketsThatCannot });
  1091. }
  1092. );
  1093. }
  1094. })
  1095. .catch(reject);
  1096. });
  1097. }
  1098. }
  1099. export default new _StationsModule();