stations.js 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let IOModule;
  8. let SongsModule;
  9. let NotificationsModule;
  10. class _StationsModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("stations");
  14. StationsModule = this;
  15. }
  16. /**
  17. * Initialises the stations module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. CacheModule = this.moduleManager.modules.cache;
  23. DBModule = this.moduleManager.modules.db;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. IOModule = this.moduleManager.modules.io;
  26. SongsModule = this.moduleManager.modules.songs;
  27. NotificationsModule = this.moduleManager.modules.notifications;
  28. this.defaultSong = {
  29. songId: "60ItHLz5WEA",
  30. title: "Faded - Alan Walker",
  31. duration: 212,
  32. skipDuration: 0,
  33. likes: -1,
  34. dislikes: -1,
  35. requestedAt: Date.now()
  36. };
  37. this.userList = {};
  38. this.usersPerStation = {};
  39. this.usersPerStationCount = {};
  40. // TEMP
  41. CacheModule.runJob("SUB", {
  42. channel: "station.pause",
  43. cb: async stationId => {
  44. NotificationsModule.runJob("REMOVE", {
  45. subscription: `stations.nextSong?id=${stationId}`
  46. }).then();
  47. }
  48. });
  49. CacheModule.runJob("SUB", {
  50. channel: "station.resume",
  51. cb: async stationId => {
  52. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  53. }
  54. });
  55. CacheModule.runJob("SUB", {
  56. channel: "station.queueUpdate",
  57. cb: async stationId => {
  58. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  59. if (!station.currentSong && station.queue.length > 0) {
  60. StationsModule.runJob("INITIALIZE_STATION", {
  61. stationId
  62. }).then();
  63. }
  64. });
  65. }
  66. });
  67. CacheModule.runJob("SUB", {
  68. channel: "station.newOfficialPlaylist",
  69. cb: async stationId => {
  70. CacheModule.runJob("HGET", {
  71. table: "officialPlaylists",
  72. key: stationId
  73. }).then(playlistObj => {
  74. if (playlistObj) {
  75. IOModule.runJob("EMIT_TO_ROOM", {
  76. room: `station.${stationId}`,
  77. args: ["event:newOfficialPlaylist", playlistObj.songs]
  78. });
  79. }
  80. });
  81. }
  82. });
  83. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  84. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  85. return new Promise((resolve, reject) =>
  86. async.waterfall(
  87. [
  88. next => {
  89. this.setStage(2);
  90. CacheModule.runJob("HGETALL", { table: "stations" })
  91. .then(stations => {
  92. next(null, stations);
  93. })
  94. .catch(next);
  95. },
  96. (stations, next) => {
  97. this.setStage(3);
  98. if (!stations) return next();
  99. const stationIds = Object.keys(stations);
  100. return async.each(
  101. stationIds,
  102. (stationId, next) => {
  103. stationModel.findOne({ _id: stationId }, (err, station) => {
  104. if (err) next(err);
  105. else if (!station) {
  106. CacheModule.runJob("HDEL", {
  107. table: "stations",
  108. key: stationId
  109. })
  110. .then(() => {
  111. next();
  112. })
  113. .catch(next);
  114. } else next();
  115. });
  116. },
  117. next
  118. );
  119. },
  120. next => {
  121. this.setStage(4);
  122. stationModel.find({}, next);
  123. },
  124. (stations, next) => {
  125. this.setStage(5);
  126. async.each(
  127. stations,
  128. (station, next2) => {
  129. async.waterfall(
  130. [
  131. next => {
  132. CacheModule.runJob("HSET", {
  133. table: "stations",
  134. key: station._id,
  135. value: stationSchema(station)
  136. })
  137. .then(station => next(null, station))
  138. .catch(next);
  139. },
  140. (station, next) => {
  141. StationsModule.runJob(
  142. "INITIALIZE_STATION",
  143. {
  144. stationId: station._id
  145. },
  146. null,
  147. -1
  148. )
  149. .then(() => {
  150. next();
  151. })
  152. .catch(next);
  153. }
  154. ],
  155. err => {
  156. next2(err);
  157. }
  158. );
  159. },
  160. next
  161. );
  162. }
  163. ],
  164. async err => {
  165. if (err) {
  166. err = await UtilsModule.runJob("GET_ERROR", {
  167. error: err
  168. });
  169. reject(new Error(err));
  170. } else {
  171. resolve();
  172. }
  173. }
  174. )
  175. );
  176. }
  177. /**
  178. * Initialises a station
  179. *
  180. * @param {object} payload - object that contains the payload
  181. * @param {string} payload.stationId - id of the station to initialise
  182. * @returns {Promise} - returns a promise (resolve, reject)
  183. */
  184. INITIALIZE_STATION(payload) {
  185. return new Promise((resolve, reject) => {
  186. // if (typeof cb !== 'function') cb = ()=>{};
  187. async.waterfall(
  188. [
  189. next => {
  190. StationsModule.runJob(
  191. "GET_STATION",
  192. {
  193. stationId: payload.stationId
  194. },
  195. this
  196. )
  197. .then(station => {
  198. next(null, station);
  199. })
  200. .catch(next);
  201. },
  202. (station, next) => {
  203. if (!station) return next("Station not found.");
  204. return NotificationsModule.runJob(
  205. "UNSCHEDULE",
  206. {
  207. name: `stations.nextSong?id=${station._id}`
  208. },
  209. this
  210. )
  211. .then()
  212. .catch()
  213. .finally(() => {
  214. NotificationsModule.runJob("SUBSCRIBE", {
  215. name: `stations.nextSong?id=${station._id}`,
  216. cb: () =>
  217. StationsModule.runJob("SKIP_STATION", {
  218. stationId: station._id
  219. }),
  220. unique: true,
  221. station
  222. })
  223. .then()
  224. .catch();
  225. if (station.paused) return next(true, station);
  226. return next(null, station);
  227. });
  228. },
  229. (station, next) => {
  230. if (!station.currentSong) {
  231. return StationsModule.runJob(
  232. "SKIP_STATION",
  233. {
  234. stationId: station._id
  235. },
  236. this
  237. )
  238. .then(station => {
  239. next(true, station);
  240. })
  241. .catch(next)
  242. .finally(() => {});
  243. }
  244. let timeLeft =
  245. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  246. if (Number.isNaN(timeLeft)) timeLeft = -1;
  247. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  248. return StationsModule.runJob(
  249. "SKIP_STATION",
  250. {
  251. stationId: station._id
  252. },
  253. this
  254. )
  255. .then(station => {
  256. next(null, station);
  257. })
  258. .catch(next);
  259. }
  260. // name, time, cb, station
  261. NotificationsModule.runJob("SCHEDULE", {
  262. name: `stations.nextSong?id=${station._id}`,
  263. time: timeLeft,
  264. station
  265. });
  266. return next(null, station);
  267. }
  268. ],
  269. async (err, station) => {
  270. if (err && err !== true) {
  271. err = await UtilsModule.runJob(
  272. "GET_ERROR",
  273. {
  274. error: err
  275. },
  276. this
  277. );
  278. reject(new Error(err));
  279. } else resolve(station);
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Calculates the next song for the station
  286. *
  287. * @param {object} payload - object that contains the payload
  288. * @param {string} payload.station - station object to calculate song for
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. async CALCULATE_SONG_FOR_STATION(payload) {
  292. // station, bypassValidate = false
  293. const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
  294. return new Promise((resolve, reject) => {
  295. const songList = [];
  296. return async.waterfall(
  297. [
  298. next => {
  299. if (payload.station.genres.length === 0) return next();
  300. const genresDone = [];
  301. const blacklistedGenres = payload.station.blacklistedGenres.map(blacklistedGenre =>
  302. blacklistedGenre.toLowerCase()
  303. );
  304. return payload.station.genres.forEach(genre => {
  305. songModel.find({ genres: { $regex: genre, $options: "i" } }, (err, songs) => {
  306. if (!err) {
  307. songs.forEach(song => {
  308. if (songList.indexOf(song._id) === -1) {
  309. let found = false;
  310. song.genres.forEach(songGenre => {
  311. if (blacklistedGenres.indexOf(songGenre.toLowerCase()) !== -1)
  312. found = true;
  313. });
  314. if (!found) {
  315. songList.push(song._id);
  316. }
  317. }
  318. });
  319. }
  320. genresDone.push(genre);
  321. if (genresDone.length === payload.station.genres.length) next();
  322. });
  323. });
  324. },
  325. next => {
  326. const playlist = [];
  327. songList.forEach(songId => {
  328. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  329. });
  330. // eslint-disable-next-line array-callback-return
  331. payload.station.playlist.filter(songId => {
  332. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  333. });
  334. UtilsModule.runJob("SHUFFLE", { array: playlist })
  335. .then(result => {
  336. next(null, result.array);
  337. }, this)
  338. .catch(next);
  339. },
  340. (playlist, next) => {
  341. StationsModule.runJob(
  342. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  343. {
  344. stationId: payload.station._id,
  345. songList: playlist
  346. },
  347. this
  348. )
  349. .then(() => {
  350. next(null, playlist);
  351. })
  352. .catch(next);
  353. },
  354. (playlist, next) => {
  355. StationsModule.stationModel.updateOne(
  356. { _id: payload.station._id },
  357. { $set: { playlist } },
  358. { runValidators: true },
  359. () => {
  360. StationsModule.runJob(
  361. "UPDATE_STATION",
  362. {
  363. stationId: payload.station._id
  364. },
  365. this
  366. )
  367. .then(() => {
  368. next(null, playlist);
  369. })
  370. .catch(next);
  371. }
  372. );
  373. }
  374. ],
  375. (err, newPlaylist) => {
  376. if (err) return reject(new Error(err));
  377. return resolve(newPlaylist);
  378. }
  379. );
  380. });
  381. }
  382. /**
  383. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  384. *
  385. * @param {object} payload - object that contains the payload
  386. * @param {string} payload.stationId - id of the station
  387. * @returns {Promise} - returns a promise (resolve, reject)
  388. */
  389. GET_STATION(payload) {
  390. return new Promise((resolve, reject) => {
  391. async.waterfall(
  392. [
  393. next => {
  394. CacheModule.runJob("HGET", { table: "stations", key: payload.stationId }, this)
  395. .then(station => next(null, station))
  396. .catch(next);
  397. },
  398. (station, next) => {
  399. if (station) return next(true, station);
  400. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  401. },
  402. (station, next) => {
  403. if (station) {
  404. if (station.type === "official") {
  405. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  406. stationId: station._id,
  407. songList: station.playlist
  408. })
  409. .then()
  410. .catch();
  411. }
  412. station = StationsModule.stationSchema(station);
  413. CacheModule.runJob("HSET", {
  414. table: "stations",
  415. key: payload.stationId,
  416. value: station
  417. })
  418. .then()
  419. .catch();
  420. next(true, station);
  421. } else next("Station not found");
  422. }
  423. ],
  424. async (err, station) => {
  425. if (err && err !== true) {
  426. err = await UtilsModule.runJob(
  427. "GET_ERROR",
  428. {
  429. error: err
  430. },
  431. this
  432. );
  433. reject(new Error(err));
  434. } else resolve(station);
  435. }
  436. );
  437. });
  438. }
  439. /**
  440. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  441. *
  442. * @param {object} payload - object that contains the payload
  443. * @param {string} payload.stationName - the unique name of the station
  444. * @returns {Promise} - returns a promise (resolve, reject)
  445. */
  446. async GET_STATION_BY_NAME(payload) {
  447. return new Promise((resolve, reject) =>
  448. async.waterfall(
  449. [
  450. next => {
  451. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  452. },
  453. (station, next) => {
  454. if (station) {
  455. if (station.type === "official") {
  456. StationsModule.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  457. stationId: station._id,
  458. songList: station.playlist
  459. });
  460. }
  461. station = StationsModule.stationSchema(station);
  462. CacheModule.runJob("HSET", {
  463. table: "stations",
  464. key: station._id,
  465. value: station
  466. });
  467. next(true, station);
  468. } else next("Station not found");
  469. }
  470. ],
  471. (err, station) => {
  472. if (err && err !== true) return reject(new Error(err));
  473. return resolve(station);
  474. }
  475. )
  476. );
  477. }
  478. /**
  479. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  480. *
  481. * @param {object} payload - object that contains the payload
  482. * @param {string} payload.stationId - the id of the station to update
  483. * @returns {Promise} - returns a promise (resolve, reject)
  484. */
  485. UPDATE_STATION(payload) {
  486. return new Promise((resolve, reject) => {
  487. async.waterfall(
  488. [
  489. next => {
  490. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  491. },
  492. (station, next) => {
  493. if (!station) {
  494. CacheModule.runJob("HDEL", {
  495. table: "stations",
  496. key: payload.stationId
  497. })
  498. .then()
  499. .catch();
  500. return next("Station not found");
  501. }
  502. return CacheModule.runJob(
  503. "HSET",
  504. {
  505. table: "stations",
  506. key: payload.stationId,
  507. value: station
  508. },
  509. this
  510. )
  511. .then(station => {
  512. next(null, station);
  513. })
  514. .catch(next);
  515. }
  516. ],
  517. async (err, station) => {
  518. if (err && err !== true) {
  519. err = await UtilsModule.runJob(
  520. "GET_ERROR",
  521. {
  522. error: err
  523. },
  524. this
  525. );
  526. reject(new Error(err));
  527. } else resolve(station);
  528. }
  529. );
  530. });
  531. }
  532. /**
  533. * Creates the official playlist for a station
  534. *
  535. * @param {object} payload - object that contains the payload
  536. * @param {string} payload.stationId - the id of the station
  537. * @param {Array} payload.songList - list of songs to put in official playlist
  538. * @returns {Promise} - returns a promise (resolve, reject)
  539. */
  540. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  541. const officialPlaylistSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" }, this);
  542. console.log(typeof payload.songList, payload.songList);
  543. return new Promise(resolve => {
  544. const lessInfoPlaylist = [];
  545. return async.each(
  546. payload.songList,
  547. (song, next) => {
  548. SongsModule.runJob("GET_SONG", { id: song }, this)
  549. .then(response => {
  550. const { song } = response;
  551. if (song) {
  552. const newSong = {
  553. _id: song._id,
  554. songId: song.songId,
  555. title: song.title,
  556. artists: song.artists,
  557. duration: song.duration,
  558. thumbnail: song.thumbnail,
  559. requestedAt: song.requestedAt
  560. };
  561. lessInfoPlaylist.push(newSong);
  562. }
  563. })
  564. .finally(() => {
  565. next();
  566. });
  567. },
  568. () => {
  569. CacheModule.runJob(
  570. "HSET",
  571. {
  572. table: "officialPlaylists",
  573. key: payload.stationId,
  574. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  575. },
  576. this
  577. ).finally(() => {
  578. CacheModule.runJob("PUB", {
  579. channel: "station.newOfficialPlaylist",
  580. value: payload.stationId
  581. });
  582. resolve();
  583. });
  584. }
  585. );
  586. });
  587. }
  588. /**
  589. * Skips a station
  590. *
  591. * @param {object} payload - object that contains the payload
  592. * @param {string} payload.stationId - the id of the station to skip
  593. * @returns {Promise} - returns a promise (resolve, reject)
  594. */
  595. SKIP_STATION(payload) {
  596. return new Promise((resolve, reject) => {
  597. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  598. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  599. async.waterfall(
  600. [
  601. next => {
  602. NotificationsModule.runJob("UNSCHEDULE", {
  603. name: `stations.nextSong?id=${payload.stationId}`
  604. })
  605. .then(() => {
  606. next();
  607. })
  608. .catch(next);
  609. },
  610. next => {
  611. StationsModule.runJob(
  612. "GET_STATION",
  613. {
  614. stationId: payload.stationId
  615. },
  616. this
  617. )
  618. .then(station => {
  619. next(null, station);
  620. })
  621. .catch(() => {});
  622. },
  623. // eslint-disable-next-line consistent-return
  624. (station, next) => {
  625. if (!station) return next("Station not found.");
  626. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  627. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  628. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  629. // Community station with party mode enabled and songs in the queue
  630. if (station.paused) return next(null, null, -19, station);
  631. return StationsModule.stationModel.updateOne(
  632. { _id: payload.stationId },
  633. {
  634. $pull: {
  635. queue: {
  636. _id: station.queue[0]._id
  637. }
  638. }
  639. },
  640. err => {
  641. if (err) return next(err);
  642. return next(null, station.queue[0], -12, station);
  643. }
  644. );
  645. }
  646. if (station.type === "community" && !station.partyMode) {
  647. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  648. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  649. if (err) return next(err);
  650. if (!playlist) return next(null, null, -13, station);
  651. playlist = playlist.songs;
  652. if (playlist.length > 0) {
  653. let currentSongIndex;
  654. if (station.currentSongIndex < playlist.length - 1)
  655. currentSongIndex = station.currentSongIndex + 1;
  656. else currentSongIndex = 0;
  657. const callback = (err, song) => {
  658. if (err) return next(err);
  659. if (song) return next(null, song, currentSongIndex, station);
  660. const currentSong = {
  661. songId: playlist[currentSongIndex].songId,
  662. title: playlist[currentSongIndex].title,
  663. duration: playlist[currentSongIndex].duration,
  664. likes: -1,
  665. dislikes: -1,
  666. requestedAt: playlist[currentSongIndex].requestedAt
  667. };
  668. return next(null, currentSong, currentSongIndex, station);
  669. };
  670. if (playlist[currentSongIndex]._id)
  671. return SongsModule.runJob(
  672. "GET_SONG",
  673. {
  674. id: playlist[currentSongIndex]._id
  675. },
  676. this
  677. )
  678. .then(response => callback(null, response.song))
  679. .catch(callback);
  680. return SongsModule.runJob(
  681. "GET_SONG_FROM_ID",
  682. {
  683. songId: playlist[currentSongIndex].songId
  684. },
  685. this
  686. )
  687. .then(response => callback(null, response.song))
  688. .catch(callback);
  689. }
  690. return next(null, null, -14, station);
  691. })
  692. );
  693. }
  694. if (station.type === "official" && station.playlist.length === 0) {
  695. return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
  696. .then(playlist => {
  697. if (playlist.length === 0)
  698. return next(null, StationsModule.defaultSong, 0, station);
  699. return SongsModule.runJob(
  700. "GET_SONG",
  701. {
  702. id: playlist[0]
  703. },
  704. this
  705. )
  706. .then(response => {
  707. next(null, response.song, 0, station);
  708. })
  709. .catch(() => next(null, StationsModule.defaultSong, 0, station));
  710. })
  711. .catch(err => {
  712. next(err);
  713. });
  714. }
  715. if (station.type === "official" && station.playlist.length > 0) {
  716. return async.doUntil(
  717. next => {
  718. if (station.currentSongIndex < station.playlist.length - 1) {
  719. SongsModule.runJob(
  720. "GET_SONG",
  721. {
  722. id: station.playlist[station.currentSongIndex + 1]
  723. },
  724. this
  725. )
  726. .then(response => next(null, response.song, station.currentSongIndex + 1))
  727. .catch(() => {
  728. station.currentSongIndex += 1;
  729. next(null, null, null);
  730. });
  731. } else {
  732. StationsModule.runJob(
  733. "CALCULATE_SONG_FOR_STATION",
  734. {
  735. station
  736. },
  737. this
  738. )
  739. .then(newPlaylist => {
  740. SongsModule.runJob("GET_SONG", { id: newPlaylist[0] }, this)
  741. .then(response => {
  742. station.playlist = newPlaylist;
  743. next(null, response.song, 0);
  744. })
  745. .catch(() => next(null, StationsModule.defaultSong, 0));
  746. })
  747. .catch(() => {
  748. next(null, StationsModule.defaultSong, 0);
  749. });
  750. }
  751. },
  752. (song, currentSongIndex, next) => {
  753. if (song) return next(null, true, currentSongIndex);
  754. return next(null, false);
  755. },
  756. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  757. );
  758. }
  759. },
  760. (song, currentSongIndex, station, next) => {
  761. const $set = {};
  762. if (song === null) $set.currentSong = null;
  763. else if (song.likes === -1 && song.dislikes === -1) {
  764. $set.currentSong = {
  765. songId: song.songId,
  766. title: song.title,
  767. duration: song.duration,
  768. skipDuration: 0,
  769. likes: -1,
  770. dislikes: -1,
  771. requestedAt: song.requestedAt
  772. };
  773. } else {
  774. $set.currentSong = {
  775. _id: song._id,
  776. songId: song.songId,
  777. title: song.title,
  778. artists: song.artists,
  779. duration: song.duration,
  780. likes: song.likes,
  781. dislikes: song.dislikes,
  782. skipDuration: song.skipDuration,
  783. thumbnail: song.thumbnail,
  784. requestedAt: song.requestedAt
  785. };
  786. }
  787. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  788. $set.startedAt = Date.now();
  789. $set.timePaused = 0;
  790. if (station.paused) $set.pausedAt = Date.now();
  791. next(null, $set, station);
  792. },
  793. ($set, station, next) => {
  794. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, err => {
  795. if (err) return next(err);
  796. return StationsModule.runJob(
  797. "UPDATE_STATION",
  798. {
  799. stationId: station._id
  800. },
  801. this
  802. )
  803. .then(station => {
  804. if (station.type === "community" && station.partyMode === true)
  805. CacheModule.runJob("PUB", {
  806. channel: "station.queueUpdate",
  807. value: payload.stationId
  808. })
  809. .then()
  810. .catch();
  811. next(null, station);
  812. })
  813. .catch(next);
  814. });
  815. }
  816. ],
  817. async (err, station) => {
  818. if (err) {
  819. err = await UtilsModule.runJob(
  820. "GET_ERROR",
  821. {
  822. error: err
  823. },
  824. this
  825. );
  826. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  827. reject(new Error(err));
  828. } else {
  829. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  830. station.currentSong.skipVotes = 0;
  831. }
  832. // TODO Pub/Sub this
  833. IOModule.runJob("EMIT_TO_ROOM", {
  834. room: `station.${station._id}`,
  835. args: [
  836. "event:songs.next",
  837. {
  838. currentSong: station.currentSong,
  839. startedAt: station.startedAt,
  840. paused: station.paused,
  841. timePaused: 0
  842. }
  843. ]
  844. })
  845. .then()
  846. .catch();
  847. if (station.privacy === "public") {
  848. IOModule.runJob("EMIT_TO_ROOM", {
  849. room: "home",
  850. args: ["event:station.nextSong", station._id, station.currentSong]
  851. })
  852. .then()
  853. .catch();
  854. } else {
  855. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  856. Object.keys(sockets).forEach(socketKey => {
  857. const socket = sockets[socketKey];
  858. const { session } = socket;
  859. if (session.sessionId) {
  860. CacheModule.runJob(
  861. "HGET",
  862. {
  863. table: "sessions",
  864. key: session.sessionId
  865. },
  866. this
  867. // eslint-disable-next-line no-loop-func
  868. ).then(session => {
  869. if (session) {
  870. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  871. userModel => {
  872. userModel.findOne(
  873. {
  874. _id: session.userId
  875. },
  876. (err, user) => {
  877. if (!err && user) {
  878. if (user.role === "admin")
  879. socket.emit(
  880. "event:station.nextSong",
  881. station._id,
  882. station.currentSong
  883. );
  884. else if (
  885. station.type === "community" &&
  886. station.owner === session.userId
  887. )
  888. socket.emit(
  889. "event:station.nextSong",
  890. station._id,
  891. station.currentSong
  892. );
  893. }
  894. }
  895. );
  896. }
  897. );
  898. }
  899. });
  900. }
  901. });
  902. }
  903. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  904. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  905. sockets: await IOModule.runJob(
  906. "GET_ROOM_SOCKETS",
  907. {
  908. room: `station.${station._id}`
  909. },
  910. this
  911. ),
  912. room: `song.${station.currentSong.songId}`
  913. });
  914. if (!station.paused) {
  915. NotificationsModule.runJob("SCHEDULE", {
  916. name: `stations.nextSong?id=${station._id}`,
  917. time: station.currentSong.duration * 1000,
  918. station
  919. });
  920. }
  921. } else {
  922. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  923. sockets: await IOModule.runJob(
  924. "GET_ROOM_SOCKETS",
  925. {
  926. room: `station.${station._id}`
  927. },
  928. this
  929. )
  930. })
  931. .then()
  932. .catch();
  933. }
  934. resolve({ station });
  935. }
  936. }
  937. );
  938. });
  939. }
  940. /**
  941. * Checks if a user can view/access a station
  942. *
  943. * @param {object} payload - object that contains the payload
  944. * @param {object} payload.station - the station object of the station in question
  945. * @param {string} payload.userId - the id of the user in question
  946. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  947. * @returns {Promise} - returns a promise (resolve, reject)
  948. */
  949. CAN_USER_VIEW_STATION(payload) {
  950. return new Promise((resolve, reject) => {
  951. async.waterfall(
  952. [
  953. next => {
  954. if (payload.station.privacy === "public") return next(true);
  955. if (payload.station.privacy === "unlisted")
  956. if (payload.hideUnlisted === true) return next();
  957. else return next(true);
  958. if (!payload.userId) return next("Not allowed");
  959. return next();
  960. },
  961. next => {
  962. DBModule.runJob(
  963. "GET_MODEL",
  964. {
  965. modelName: "user"
  966. },
  967. this
  968. ).then(userModel => {
  969. userModel.findOne({ _id: payload.userId }, next);
  970. });
  971. },
  972. (user, next) => {
  973. if (!user) return next("Not allowed");
  974. if (user.role === "admin") return next(true);
  975. if (payload.station.type === "official") return next("Not allowed");
  976. if (payload.station.owner === payload.userId) return next(true);
  977. return next("Not allowed");
  978. }
  979. ],
  980. async errOrResult => {
  981. if (errOrResult !== true && errOrResult !== "Not allowed") {
  982. errOrResult = await UtilsModule.runJob(
  983. "GET_ERROR",
  984. {
  985. error: errOrResult
  986. },
  987. this
  988. );
  989. reject(new Error(errOrResult));
  990. } else {
  991. resolve(errOrResult === true);
  992. }
  993. }
  994. );
  995. });
  996. }
  997. /**
  998. * Checks if a user has favorited a station or not
  999. *
  1000. * @param {object} payload - object that contains the payload
  1001. * @param {object} payload.stationId - the id of the station in question
  1002. * @param {string} payload.userId - the id of the user in question
  1003. * @returns {Promise} - returns a promise (resolve, reject)
  1004. */
  1005. HAS_USER_FAVORITED_STATION(payload) {
  1006. return new Promise((resolve, reject) => {
  1007. async.waterfall(
  1008. [
  1009. next => {
  1010. DBModule.runJob(
  1011. "GET_MODEL",
  1012. {
  1013. modelName: "user"
  1014. },
  1015. this
  1016. ).then(userModel => {
  1017. userModel.findOne({ _id: payload.userId }, next);
  1018. });
  1019. },
  1020. (user, next) => {
  1021. if (!user) return next("User not found.");
  1022. if (user.favoriteStations.indexOf(payload.stationId) !== -1) return next(null, true);
  1023. return next(null, false);
  1024. }
  1025. ],
  1026. async (err, isStationFavorited) => {
  1027. if (err && err !== true) {
  1028. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1029. return reject(new Error(err));
  1030. }
  1031. return resolve(isStationFavorited);
  1032. }
  1033. );
  1034. });
  1035. }
  1036. /**
  1037. * Returns a list of sockets in a room that can and can't know about a station
  1038. *
  1039. * @param {object} payload - the payload object
  1040. * @param {object} payload.station - the station object
  1041. * @param {string} payload.room - the socket.io room to get the sockets from
  1042. * @returns {Promise} - returns a promise (resolve, reject)
  1043. */
  1044. GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION(payload) {
  1045. return new Promise((resolve, reject) => {
  1046. IOModule.runJob("GET_ROOM_SOCKETS", { room: payload.room }, this)
  1047. .then(socketsObject => {
  1048. const sockets = Object.keys(socketsObject).map(socketKey => socketsObject[socketKey]);
  1049. let socketsThatCan = [];
  1050. const socketsThatCannot = [];
  1051. if (payload.station.privacy === "public") {
  1052. socketsThatCan = sockets;
  1053. resolve({ socketsThatCan, socketsThatCannot });
  1054. } else {
  1055. async.eachLimit(
  1056. sockets,
  1057. 1,
  1058. (socket, next) => {
  1059. const { session } = socket;
  1060. async.waterfall(
  1061. [
  1062. next => {
  1063. if (!session.sessionId) next("No session id");
  1064. else next();
  1065. },
  1066. next => {
  1067. CacheModule.runJob(
  1068. "HGET",
  1069. {
  1070. table: "sessions",
  1071. key: session.sessionId
  1072. },
  1073. this
  1074. )
  1075. .then(response => {
  1076. next(null, response);
  1077. })
  1078. .catch(next);
  1079. },
  1080. (session, next) => {
  1081. if (!session) next("No session");
  1082. else {
  1083. DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
  1084. .then(userModel => {
  1085. next(null, userModel);
  1086. })
  1087. .catch(next);
  1088. }
  1089. },
  1090. (userModel, next) => {
  1091. if (!userModel) next("No user model");
  1092. else
  1093. userModel.findOne(
  1094. {
  1095. _id: session.userId
  1096. },
  1097. next
  1098. );
  1099. },
  1100. (user, next) => {
  1101. if (!user) next("No user found");
  1102. else if (user.role === "admin") {
  1103. socketsThatCan.push(socket);
  1104. next();
  1105. } else if (
  1106. payload.station.type === "community" &&
  1107. payload.station.owner === session.userId
  1108. ) {
  1109. socketsThatCan.push(socket);
  1110. next();
  1111. }
  1112. }
  1113. ],
  1114. err => {
  1115. if (err) socketsThatCannot.push(socket);
  1116. next();
  1117. }
  1118. );
  1119. },
  1120. err => {
  1121. if (err) reject(err);
  1122. else resolve({ socketsThatCan, socketsThatCannot });
  1123. }
  1124. );
  1125. }
  1126. })
  1127. .catch(reject);
  1128. });
  1129. }
  1130. }
  1131. export default new _StationsModule();