stations.js 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1,
  35. requestedAt: Date.now()
  36. };
  37. this.userList = {};
  38. this.usersPerStation = {};
  39. this.usersPerStationCount = {};
  40. // TEMP
  41. CacheModule.runJob("SUB", {
  42. channel: "station.pause",
  43. cb: async stationId => {
  44. NotificationsModule.runJob("REMOVE", {
  45. subscription: `stations.nextSong?id=${stationId}`
  46. }).then();
  47. }
  48. });
  49. CacheModule.runJob("SUB", {
  50. channel: "station.resume",
  51. cb: async stationId => {
  52. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  53. }
  54. });
  55. CacheModule.runJob("SUB", {
  56. channel: "station.queueUpdate",
  57. cb: async stationId => {
  58. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  59. if (!station.currentSong && station.queue.length > 0) {
  60. StationsModule.runJob("INITIALIZE_STATION", {
  61. stationId
  62. }).then();
  63. }
  64. });
  65. }
  66. });
  67. CacheModule.runJob("SUB", {
  68. channel: "station.newOfficialPlaylist",
  69. cb: async stationId => {
  70. CacheModule.runJob("HGET", {
  71. table: "officialPlaylists",
  72. key: stationId
  73. }).then(playlistObj => {
  74. if (playlistObj) {
  75. IOModule.runJob("EMIT_TO_ROOM", {
  76. room: `station.${stationId}`,
  77. args: ["event:newOfficialPlaylist", playlistObj.songs]
  78. });
  79. }
  80. });
  81. }
  82. });
  83. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  84. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  85. return new Promise((resolve, reject) =>
  86. async.waterfall(
  87. [
  88. next => {
  89. this.setStage(2);
  90. CacheModule.runJob("HGETALL", { table: "stations" })
  91. .then(stations => {
  92. next(null, stations);
  93. })
  94. .catch(next);
  95. },
  96. (stations, next) => {
  97. this.setStage(3);
  98. if (!stations) return next();
  99. const stationIds = Object.keys(stations);
  100. return async.each(
  101. stationIds,
  102. (stationId, next) => {
  103. stationModel.findOne({ _id: stationId }, (err, station) => {
  104. if (err) next(err);
  105. else if (!station) {
  106. CacheModule.runJob("HDEL", {
  107. table: "stations",
  108. key: stationId
  109. })
  110. .then(() => {
  111. next();
  112. })
  113. .catch(next);
  114. } else next();
  115. });
  116. },
  117. next
  118. );
  119. },
  120. next => {
  121. this.setStage(4);
  122. stationModel.find({}, next);
  123. },
  124. (stations, next) => {
  125. this.setStage(5);
  126. async.each(
  127. stations,
  128. (station, next2) => {
  129. async.waterfall(
  130. [
  131. next => {
  132. CacheModule.runJob("HSET", {
  133. table: "stations",
  134. key: station._id,
  135. value: stationSchema(station)
  136. })
  137. .then(station => next(null, station))
  138. .catch(next);
  139. },
  140. (station, next) => {
  141. StationsModule.runJob(
  142. "INITIALIZE_STATION",
  143. {
  144. stationId: station._id
  145. },
  146. null,
  147. -1
  148. )
  149. .then(() => {
  150. next();
  151. })
  152. .catch(next);
  153. }
  154. ],
  155. err => {
  156. next2(err);
  157. }
  158. );
  159. },
  160. next
  161. );
  162. }
  163. ],
  164. async err => {
  165. if (err) {
  166. err = await UtilsModule.runJob("GET_ERROR", {
  167. error: err
  168. });
  169. reject(new Error(err));
  170. } else {
  171. resolve();
  172. }
  173. }
  174. )
  175. );
  176. }
  177. /**
  178. * Initialises a station
  179. *
  180. * @param {object} payload - object that contains the payload
  181. * @param {string} payload.stationId - id of the station to initialise
  182. * @returns {Promise} - returns a promise (resolve, reject)
  183. */
  184. INITIALIZE_STATION(payload) {
  185. return new Promise((resolve, reject) => {
  186. // if (typeof cb !== 'function') cb = ()=>{};
  187. async.waterfall(
  188. [
  189. next => {
  190. StationsModule.runJob(
  191. "GET_STATION",
  192. {
  193. stationId: payload.stationId
  194. },
  195. this
  196. )
  197. .then(station => {
  198. next(null, station);
  199. })
  200. .catch(next);
  201. },
  202. (station, next) => {
  203. if (!station) return next("Station not found.");
  204. return NotificationsModule.runJob(
  205. "UNSCHEDULE",
  206. {
  207. name: `stations.nextSong?id=${station._id}`
  208. },
  209. this
  210. )
  211. .then()
  212. .catch()
  213. .finally(() => {
  214. NotificationsModule.runJob("SUBSCRIBE", {
  215. name: `stations.nextSong?id=${station._id}`,
  216. cb: () =>
  217. StationsModule.runJob("SKIP_STATION", {
  218. stationId: station._id
  219. }),
  220. unique: true,
  221. station
  222. })
  223. .then()
  224. .catch();
  225. if (station.paused) return next(true, station);
  226. return next(null, station);
  227. });
  228. },
  229. (station, next) => {
  230. if (!station.currentSong) {
  231. return StationsModule.runJob(
  232. "SKIP_STATION",
  233. {
  234. stationId: station._id
  235. },
  236. this
  237. )
  238. .then(station => {
  239. next(true, station);
  240. })
  241. .catch(next)
  242. .finally(() => {});
  243. }
  244. let timeLeft =
  245. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  246. if (Number.isNaN(timeLeft)) timeLeft = -1;
  247. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  248. return StationsModule.runJob(
  249. "SKIP_STATION",
  250. {
  251. stationId: station._id
  252. },
  253. this
  254. )
  255. .then(station => {
  256. next(null, station);
  257. })
  258. .catch(next);
  259. }
  260. // name, time, cb, station
  261. NotificationsModule.runJob("SCHEDULE", {
  262. name: `stations.nextSong?id=${station._id}`,
  263. time: timeLeft,
  264. station
  265. });
  266. return next(null, station);
  267. }
  268. ],
  269. async (err, station) => {
  270. if (err && err !== true) {
  271. err = await UtilsModule.runJob(
  272. "GET_ERROR",
  273. {
  274. error: err
  275. },
  276. this
  277. );
  278. reject(new Error(err));
  279. } else resolve(station);
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Calculates the next song for the station
  286. *
  287. * @param {object} payload - object that contains the payload
  288. * @param {string} payload.station - station object to calculate song for
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. async CALCULATE_SONG_FOR_STATION(payload) {
  292. // station, bypassValidate = false
  293. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  294. return new Promise((resolve, reject) => {
  295. const songList = [];
  296. return async.waterfall(
  297. [
  298. next => {
  299. if (payload.station.genres.length === 0) return next();
  300. const genresDone = [];
  301. const blacklistedGenres = payload.station.blacklistedGenres.map(blacklistedGenre =>
  302. blacklistedGenre.toLowerCase()
  303. );
  304. return payload.station.genres.forEach(genre => {
  305. songModel.find({ genres: { $regex: genre, $options: "i" } }, (err, songs) => {
  306. if (!err) {
  307. songs.forEach(song => {
  308. if (songList.indexOf(song._id) === -1) {
  309. let found = false;
  310. song.genres.forEach(songGenre => {
  311. if (blacklistedGenres.indexOf(songGenre.toLowerCase()) !== -1)
  312. found = true;
  313. });
  314. if (!found) {
  315. songList.push(song._id);
  316. }
  317. }
  318. });
  319. }
  320. genresDone.push(genre);
  321. if (genresDone.length === payload.station.genres.length) next();
  322. });
  323. });
  324. },
  325. next => {
  326. const playlist = [];
  327. songList.forEach(songId => {
  328. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  329. });
  330. // eslint-disable-next-line array-callback-return
  331. payload.station.playlist.filter(songId => {
  332. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  333. });
  334. UtilsModule.runJob("SHUFFLE", { array: playlist })
  335. .then(result => {
  336. next(null, result.array);
  337. }, this)
  338. .catch(next);
  339. },
  340. (playlist, next) => {
  341. StationsModule.runJob(
  342. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  343. {
  344. stationId: payload.station._id,
  345. songList: playlist
  346. },
  347. this
  348. )
  349. .then(() => {
  350. next(null, playlist);
  351. })
  352. .catch(next);
  353. },
  354. (playlist, next) => {
  355. StationsModule.stationModel.updateOne(
  356. { _id: payload.station._id },
  357. { $set: { playlist } },
  358. { runValidators: true },
  359. () => {
  360. StationsModule.runJob(
  361. "UPDATE_STATION",
  362. {
  363. stationId: payload.station._id
  364. },
  365. this
  366. )
  367. .then(() => {
  368. next(null, playlist);
  369. })
  370. .catch(next);
  371. }
  372. );
  373. }
  374. ],
  375. (err, newPlaylist) => {
  376. if (err) return reject(new Error(err));
  377. return resolve(newPlaylist);
  378. }
  379. );
  380. });
  381. }
  382. /**
  383. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  384. *
  385. * @param {object} payload - object that contains the payload
  386. * @param {string} payload.stationId - id of the station
  387. * @returns {Promise} - returns a promise (resolve, reject)
  388. */
  389. GET_STATION(payload) {
  390. return new Promise((resolve, reject) => {
  391. async.waterfall(
  392. [
  393. next => {
  394. CacheModule.runJob(
  395. "HGET",
  396. {
  397. table: "stations",
  398. key: payload.stationId
  399. },
  400. this
  401. )
  402. .then(station => {
  403. next(null, station);
  404. })
  405. .catch(next);
  406. },
  407. (station, next) => {
  408. if (station) return next(true, station);
  409. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  410. },
  411. (station, next) => {
  412. if (station) {
  413. if (station.type === "official") {
  414. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  415. stationId: station._id,
  416. songList: station.playlist
  417. })
  418. .then()
  419. .catch();
  420. }
  421. station = StationsModule.stationSchema(station);
  422. CacheModule.runJob("HSET", {
  423. table: "stations",
  424. key: payload.stationId,
  425. value: station
  426. })
  427. .then()
  428. .catch();
  429. next(true, station);
  430. } else next("Station not found");
  431. }
  432. ],
  433. async (err, station) => {
  434. if (err && err !== true) {
  435. err = await UtilsModule.runJob(
  436. "GET_ERROR",
  437. {
  438. error: err
  439. },
  440. this
  441. );
  442. reject(new Error(err));
  443. } else resolve(station);
  444. }
  445. );
  446. });
  447. }
  448. /**
  449. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  450. *
  451. * @param {object} payload - object that contains the payload
  452. * @param {string} payload.stationName - the unique name of the station
  453. * @returns {Promise} - returns a promise (resolve, reject)
  454. */
  455. async GET_STATION_BY_NAME(payload) {
  456. return new Promise((resolve, reject) =>
  457. async.waterfall(
  458. [
  459. next => {
  460. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  461. },
  462. (station, next) => {
  463. if (station) {
  464. if (station.type === "official") {
  465. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  466. stationId: station._id,
  467. songList: station.playlist
  468. });
  469. }
  470. station = StationsModule.stationSchema(station);
  471. CacheModule.runJob("HSET", {
  472. table: "stations",
  473. key: station._id,
  474. value: station
  475. });
  476. next(true, station);
  477. } else next("Station not found");
  478. }
  479. ],
  480. (err, station) => {
  481. if (err && err !== true) return reject(new Error(err));
  482. return resolve(station);
  483. }
  484. )
  485. );
  486. }
  487. /**
  488. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  489. *
  490. * @param {object} payload - object that contains the payload
  491. * @param {string} payload.stationId - the id of the station to update
  492. * @returns {Promise} - returns a promise (resolve, reject)
  493. */
  494. UPDATE_STATION(payload) {
  495. return new Promise((resolve, reject) => {
  496. async.waterfall(
  497. [
  498. next => {
  499. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  500. },
  501. (station, next) => {
  502. if (!station) {
  503. CacheModule.runJob("HDEL", {
  504. table: "stations",
  505. key: payload.stationId
  506. })
  507. .then()
  508. .catch();
  509. return next("Station not found");
  510. }
  511. return CacheModule.runJob(
  512. "HSET",
  513. {
  514. table: "stations",
  515. key: payload.stationId,
  516. value: station
  517. },
  518. this
  519. )
  520. .then(station => {
  521. next(null, station);
  522. })
  523. .catch(next);
  524. }
  525. ],
  526. async (err, station) => {
  527. if (err && err !== true) {
  528. err = await UtilsModule.runJob(
  529. "GET_ERROR",
  530. {
  531. error: err
  532. },
  533. this
  534. );
  535. reject(new Error(err));
  536. } else resolve(station);
  537. }
  538. );
  539. });
  540. }
  541. /**
  542. * Creates the official playlist for a station
  543. *
  544. * @param {object} payload - object that contains the payload
  545. * @param {string} payload.stationId - the id of the station
  546. * @param {Array} payload.songList - list of songs to put in official playlist
  547. * @returns {Promise} - returns a promise (resolve, reject)
  548. */
  549. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  550. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  551. console.log(typeof payload.songList, payload.songList);
  552. return new Promise(resolve => {
  553. const lessInfoPlaylist = [];
  554. return async.each(
  555. payload.songList,
  556. (song, next) => {
  557. SongsModule.runJob("GET_SONG", { id: song }, this)
  558. .then(response => {
  559. const { song } = response;
  560. if (song) {
  561. const newSong = {
  562. _id: song._id,
  563. songId: song.songId,
  564. title: song.title,
  565. artists: song.artists,
  566. duration: song.duration,
  567. thumbnail: song.thumbnail,
  568. requestedAt: song.requestedAt
  569. };
  570. lessInfoPlaylist.push(newSong);
  571. }
  572. })
  573. .finally(() => {
  574. next();
  575. });
  576. },
  577. () => {
  578. CacheModule.runJob(
  579. "HSET",
  580. {
  581. table: "officialPlaylists",
  582. key: payload.stationId,
  583. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  584. },
  585. this
  586. ).finally(() => {
  587. CacheModule.runJob("PUB", {
  588. channel: "station.newOfficialPlaylist",
  589. value: payload.stationId
  590. });
  591. resolve();
  592. });
  593. }
  594. );
  595. });
  596. }
  597. /**
  598. * Skips a station
  599. *
  600. * @param {object} payload - object that contains the payload
  601. * @param {string} payload.stationId - the id of the station to skip
  602. * @returns {Promise} - returns a promise (resolve, reject)
  603. */
  604. SKIP_STATION(payload) {
  605. return new Promise((resolve, reject) => {
  606. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  607. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  608. async.waterfall(
  609. [
  610. next => {
  611. NotificationsModule.runJob("UNSCHEDULE", {
  612. name: `stations.nextSong?id=${payload.stationId}`
  613. })
  614. .then(() => {
  615. next();
  616. })
  617. .catch(next);
  618. },
  619. next => {
  620. StationsModule.runJob(
  621. "GET_STATION",
  622. {
  623. stationId: payload.stationId
  624. },
  625. this
  626. )
  627. .then(station => {
  628. next(null, station);
  629. })
  630. .catch(() => {});
  631. },
  632. // eslint-disable-next-line consistent-return
  633. (station, next) => {
  634. if (!station) return next("Station not found.");
  635. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  636. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  637. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  638. // Community station with party mode enabled and songs in the queue
  639. if (station.paused) return next(null, null, -19, station);
  640. return StationsModule.stationModel.updateOne(
  641. { _id: payload.stationId },
  642. {
  643. $pull: {
  644. queue: {
  645. _id: station.queue[0]._id
  646. }
  647. }
  648. },
  649. err => {
  650. if (err) return next(err);
  651. return next(null, station.queue[0], -12, station);
  652. }
  653. );
  654. }
  655. if (station.type === "community" && !station.partyMode) {
  656. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  657. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  658. if (err) return next(err);
  659. if (!playlist) return next(null, null, -13, station);
  660. playlist = playlist.songs;
  661. if (playlist.length > 0) {
  662. let currentSongIndex;
  663. if (station.currentSongIndex < playlist.length - 1)
  664. currentSongIndex = station.currentSongIndex + 1;
  665. else currentSongIndex = 0;
  666. const callback = (err, song) => {
  667. if (err) return next(err);
  668. if (song) return next(null, song, currentSongIndex, station);
  669. const currentSong = {
  670. songId: playlist[currentSongIndex].songId,
  671. title: playlist[currentSongIndex].title,
  672. duration: playlist[currentSongIndex].duration,
  673. likes: -1,
  674. dislikes: -1,
  675. requestedAt: playlist[currentSongIndex].requestedAt
  676. };
  677. return next(null, currentSong, currentSongIndex, station);
  678. };
  679. if (playlist[currentSongIndex]._id)
  680. return SongsModule.runJob(
  681. "GET_SONG",
  682. {
  683. id: playlist[currentSongIndex]._id
  684. },
  685. this
  686. )
  687. .then(response => callback(null, response.song))
  688. .catch(callback);
  689. return SongsModule.runJob(
  690. "GET_SONG_FROM_ID",
  691. {
  692. songId: playlist[currentSongIndex].songId
  693. },
  694. this
  695. )
  696. .then(response => callback(null, response.song))
  697. .catch(callback);
  698. }
  699. return next(null, null, -14, station);
  700. })
  701. );
  702. }
  703. if (station.type === "official" && station.playlist.length === 0) {
  704. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  705. .then(playlist => {
  706. if (playlist.length === 0)
  707. return next(null, StationsModule.defaultSong, 0, station);
  708. return SongsModule.runJob(
  709. "GET_SONG",
  710. {
  711. id: playlist[0]
  712. },
  713. this
  714. )
  715. .then(response => {
  716. next(null, response.song, 0, station);
  717. })
  718. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  719. })
  720. .catch(err => {
  721. next(err);
  722. });
  723. }
  724. if (station.type === "official" && station.playlist.length > 0) {
  725. return async.doUntil(
  726. next => {
  727. if (station.currentSongIndex < station.playlist.length - 1) {
  728. SongsModule.runJob(
  729. "GET_SONG",
  730. {
  731. id: station.playlist[station.currentSongIndex + 1]
  732. },
  733. this
  734. )
  735. .then(response => next(null, response.song, station.currentSongIndex + 1))
  736. .catch(() => {
  737. station.currentSongIndex += 1;
  738. next(null, null, null);
  739. });
  740. } else {
  741. StationsModule.runJob(
  742. "CALCULATE_SONG_FOR_STATION",
  743. {
  744. station
  745. },
  746. this
  747. )
  748. .then(newPlaylist => {
  749. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  750. .then(response => {
  751. station.playlist = newPlaylist;
  752. next(null, response.song, 0);
  753. })
  754. .catch(() => next(null, StationsModule.defaultSong, 0));
  755. })
  756. .catch(() => {
  757. next(null, StationsModule.defaultSong, 0);
  758. });
  759. }
  760. },
  761. (song, currentSongIndex, next) => {
  762. if (song) return next(null, true, currentSongIndex);
  763. return next(null, false);
  764. },
  765. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  766. );
  767. }
  768. },
  769. (song, currentSongIndex, station, next) => {
  770. const $set = {};
  771. if (song === null) $set.currentSong = null;
  772. else if (song.likes === -1 && song.dislikes === -1) {
  773. $set.currentSong = {
  774. songId: song.songId,
  775. title: song.title,
  776. duration: song.duration,
  777. skipDuration: 0,
  778. likes: -1,
  779. dislikes: -1,
  780. requestedAt: song.requestedAt
  781. };
  782. } else {
  783. $set.currentSong = {
  784. _id: song._id,
  785. songId: song.songId,
  786. title: song.title,
  787. artists: song.artists,
  788. duration: song.duration,
  789. likes: song.likes,
  790. dislikes: song.dislikes,
  791. skipDuration: song.skipDuration,
  792. thumbnail: song.thumbnail,
  793. requestedAt: song.requestedAt
  794. };
  795. }
  796. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  797. $set.startedAt = Date.now();
  798. $set.timePaused = 0;
  799. if (station.paused) $set.pausedAt = Date.now();
  800. next(null, $set, station);
  801. },
  802. ($set, station, next) => {
  803. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, err => {
  804. if (err) return next(err);
  805. return StationsModule.runJob(
  806. "UPDATE_STATION",
  807. {
  808. stationId: station._id
  809. },
  810. this
  811. )
  812. .then(station => {
  813. if (station.type === "community" && station.partyMode === true)
  814. CacheModule.runJob("PUB", {
  815. channel: "station.queueUpdate",
  816. value: payload.stationId
  817. })
  818. .then()
  819. .catch();
  820. next(null, station);
  821. })
  822. .catch(next);
  823. });
  824. }
  825. ],
  826. async (err, station) => {
  827. if (err) {
  828. err = await UtilsModule.runJob(
  829. "GET_ERROR",
  830. {
  831. error: err
  832. },
  833. this
  834. );
  835. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  836. reject(new Error(err));
  837. } else {
  838. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  839. station.currentSong.skipVotes = 0;
  840. }
  841. // TODO Pub/Sub this
  842. IOModule.runJob("EMIT_TO_ROOM", {
  843. room: `station.${station._id}`,
  844. args: [
  845. "event:songs.next",
  846. {
  847. currentSong: station.currentSong,
  848. startedAt: station.startedAt,
  849. paused: station.paused,
  850. timePaused: 0
  851. }
  852. ]
  853. })
  854. .then()
  855. .catch();
  856. if (station.privacy === "public") {
  857. IOModule.runJob("EMIT_TO_ROOM", {
  858. room: "home",
  859. args: ["event:station.nextSong", station._id, station.currentSong]
  860. })
  861. .then()
  862. .catch();
  863. } else {
  864. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  865. Object.keys(sockets).forEach(socketKey => {
  866. const socket = sockets[socketKey];
  867. const { session } = socket;
  868. if (session.sessionId) {
  869. CacheModule.runJob(
  870. "HGET",
  871. {
  872. table: "sessions",
  873. key: session.sessionId
  874. },
  875. this
  876. // eslint-disable-next-line no-loop-func
  877. ).then(session => {
  878. if (session) {
  879. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  880. userModel => {
  881. userModel.findOne(
  882. {
  883. _id: session.userId
  884. },
  885. (err, user) => {
  886. if (!err && user) {
  887. if (user.role === "admin")
  888. socket.emit(
  889. "event:station.nextSong",
  890. station._id,
  891. station.currentSong
  892. );
  893. else if (
  894. station.type === "community" &&
  895. station.owner === session.userId
  896. )
  897. socket.emit(
  898. "event:station.nextSong",
  899. station._id,
  900. station.currentSong
  901. );
  902. }
  903. }
  904. );
  905. }
  906. );
  907. }
  908. });
  909. }
  910. });
  911. }
  912. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  913. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  914. sockets: await IOModule.runJob(
  915. "GET_ROOM_SOCKETS",
  916. {
  917. room: `station.${station._id}`
  918. },
  919. this
  920. ),
  921. room: `song.${station.currentSong.songId}`
  922. });
  923. if (!station.paused) {
  924. NotificationsModule.runJob("SCHEDULE", {
  925. name: `stations.nextSong?id=${station._id}`,
  926. time: station.currentSong.duration * 1000,
  927. station
  928. });
  929. }
  930. } else {
  931. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  932. sockets: await IOModule.runJob(
  933. "GET_ROOM_SOCKETS",
  934. {
  935. room: `station.${station._id}`
  936. },
  937. this
  938. )
  939. })
  940. .then()
  941. .catch();
  942. }
  943. resolve({ station });
  944. }
  945. }
  946. );
  947. });
  948. }
  949. /**
  950. * Checks if a user can view/access a station
  951. *
  952. * @param {object} payload - object that contains the payload
  953. * @param {object} payload.station - the station object of the station in question
  954. * @param {string} payload.userId - the id of the user in question
  955. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  956. * @returns {Promise} - returns a promise (resolve, reject)
  957. */
  958. CAN_USER_VIEW_STATION(payload) {
  959. return new Promise((resolve, reject) => {
  960. async.waterfall(
  961. [
  962. next => {
  963. if (payload.station.privacy === "public") return next(true);
  964. if (payload.station.privacy === "unlisted")
  965. if (payload.hideUnlisted === true) return next();
  966. else return next(true);
  967. if (!payload.userId) return next("Not allowed");
  968. return next();
  969. },
  970. next => {
  971. DBModule.runJob(
  972. "GET_MODEL",
  973. {
  974. modelName: "user"
  975. },
  976. this
  977. ).then(userModel => {
  978. userModel.findOne({ _id: payload.userId }, next);
  979. });
  980. },
  981. (user, next) => {
  982. if (!user) return next("Not allowed");
  983. if (user.role === "admin") return next(true);
  984. if (payload.station.type === "official") return next("Not allowed");
  985. if (payload.station.owner === payload.userId) return next(true);
  986. return next("Not allowed");
  987. }
  988. ],
  989. async errOrResult => {
  990. if (errOrResult !== true && errOrResult !== "Not allowed") {
  991. errOrResult = await UtilsModule.runJob(
  992. "GET_ERROR",
  993. {
  994. error: errOrResult
  995. },
  996. this
  997. );
  998. reject(new Error(errOrResult));
  999. } else {
  1000. resolve(errOrResult === true);
  1001. }
  1002. }
  1003. );
  1004. });
  1005. }
  1006. /**
  1007. * Checks if a user has favorited a station or not
  1008. *
  1009. * @param {object} payload - object that contains the payload
  1010. * @param {object} payload.stationId - the id of the station in question
  1011. * @param {string} payload.userId - the id of the user in question
  1012. * @returns {Promise} - returns a promise (resolve, reject)
  1013. */
  1014. HAS_USER_FAVORITED_STATION(payload) {
  1015. return new Promise((resolve, reject) => {
  1016. async.waterfall(
  1017. [
  1018. next => {
  1019. DBModule.runJob(
  1020. "GET_MODEL",
  1021. {
  1022. modelName: "user"
  1023. },
  1024. this
  1025. ).then(userModel => {
  1026. userModel.findOne({ _id: payload.userId }, next);
  1027. });
  1028. },
  1029. (user, next) => {
  1030. if (!user) return next("User not found.");
  1031. if (user.favoriteStations.indexOf(payload.stationId) !== -1) return next(null, true);
  1032. return next(null, false);
  1033. }
  1034. ],
  1035. async (err, isStationFavorited) => {
  1036. if (err && err !== true) {
  1037. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1038. return reject(new Error(err));
  1039. }
  1040. return resolve(isStationFavorited);
  1041. }
  1042. );
  1043. });
  1044. }
  1045. /**
  1046. * Returns a list of sockets in a room that can and can't know about a station
  1047. *
  1048. * @param {object} payload - the payload object
  1049. * @param {object} payload.station - the station object
  1050. * @param {string} payload.room - the socket.io room to get the sockets from
  1051. * @returns {Promise} - returns a promise (resolve, reject)
  1052. */
  1053. GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION(payload) {
  1054. return new Promise((resolve, reject) => {
  1055. IOModule.runJob("GET_ROOM_SOCKETS", { room: payload.room }, this)
  1056. .then(socketsObject => {
  1057. const sockets = Object.keys(socketsObject).map(socketKey => socketsObject[socketKey]);
  1058. let socketsThatCan = [];
  1059. const socketsThatCannot = [];
  1060. if (payload.station.privacy === "public") {
  1061. socketsThatCan = sockets;
  1062. resolve({ socketsThatCan, socketsThatCannot });
  1063. } else {
  1064. async.eachLimit(
  1065. sockets,
  1066. 1,
  1067. (socket, next) => {
  1068. const { session } = socket;
  1069. async.waterfall(
  1070. [
  1071. next => {
  1072. if (!session.sessionId) next("No session id");
  1073. else next();
  1074. },
  1075. next => {
  1076. CacheModule.runJob(
  1077. "HGET",
  1078. {
  1079. table: "sessions",
  1080. key: session.sessionId
  1081. },
  1082. this
  1083. )
  1084. .then(response => {
  1085. next(null, response);
  1086. })
  1087. .catch(next);
  1088. },
  1089. (session, next) => {
  1090. if (!session) next("No session");
  1091. else {
  1092. DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
  1093. .then(userModel => {
  1094. next(null, userModel);
  1095. })
  1096. .catch(next);
  1097. }
  1098. },
  1099. (userModel, next) => {
  1100. if (!userModel) next("No user model");
  1101. else
  1102. userModel.findOne(
  1103. {
  1104. _id: session.userId
  1105. },
  1106. next
  1107. );
  1108. },
  1109. (user, next) => {
  1110. if (!user) next("No user found");
  1111. else if (user.role === "admin") {
  1112. socketsThatCan.push(socket);
  1113. next();
  1114. } else if (
  1115. payload.station.type === "community" &&
  1116. payload.station.owner === session.userId
  1117. ) {
  1118. socketsThatCan.push(socket);
  1119. next();
  1120. }
  1121. }
  1122. ],
  1123. err => {
  1124. if (err) socketsThatCannot.push(socket);
  1125. next();
  1126. }
  1127. );
  1128. },
  1129. err => {
  1130. if (err) reject(err);
  1131. else resolve({ socketsThatCan, socketsThatCannot });
  1132. }
  1133. );
  1134. }
  1135. })
  1136. .catch(reject);
  1137. });
  1138. }
  1139. }
  1140. export default new _StationsModule();