stations.js 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033
  1. import async from "async";
  2. import CoreClass from "../core";
  3. class StationsModule extends CoreClass {
  4. // eslint-disable-next-line require-jsdoc
  5. constructor() {
  6. super("stations");
  7. }
  8. /**
  9. * Initialises the stations module
  10. *
  11. * @returns {Promise} - returns promise (reject, resolve)
  12. */
  13. async initialize() {
  14. this.cache = this.moduleManager.modules.cache;
  15. this.db = this.moduleManager.modules.db;
  16. this.utils = this.moduleManager.modules.utils;
  17. this.songs = this.moduleManager.modules.songs;
  18. this.notifications = this.moduleManager.modules.notifications;
  19. this.defaultSong = {
  20. songId: "60ItHLz5WEA",
  21. title: "Faded - Alan Walker",
  22. duration: 212,
  23. skipDuration: 0,
  24. likes: -1,
  25. dislikes: -1
  26. };
  27. // TEMP
  28. this.cache.runJob("SUB", {
  29. channel: "station.pause",
  30. cb: async stationId => {
  31. this.notifications
  32. .runJob("REMOVE", {
  33. subscription: `stations.nextSong?id=${stationId}`
  34. })
  35. .then();
  36. }
  37. });
  38. this.cache.runJob("SUB", {
  39. channel: "station.resume",
  40. cb: async stationId => {
  41. this.runJob("INITIALIZE_STATION", { stationId }).then();
  42. }
  43. });
  44. this.cache.runJob("SUB", {
  45. channel: "station.queueUpdate",
  46. cb: async stationId => {
  47. this.runJob("GET_STATION", { stationId }).then(station => {
  48. if (!station.currentSong && station.queue.length > 0) {
  49. this.runJob("INITIALIZE_STATION", {
  50. stationId
  51. }).then();
  52. }
  53. });
  54. }
  55. });
  56. this.cache.runJob("SUB", {
  57. channel: "station.newOfficialPlaylist",
  58. cb: async stationId => {
  59. this.cache
  60. .runJob("HGET", {
  61. table: "officialPlaylists",
  62. key: stationId
  63. })
  64. .then(playlistObj => {
  65. if (playlistObj) {
  66. this.utils.runJob("EMIT_TO_ROOM", {
  67. room: `station.${stationId}`,
  68. args: ["event:newOfficialPlaylist", playlistObj.songs]
  69. });
  70. }
  71. });
  72. }
  73. });
  74. const stationModel = (this.stationModel = await this.db.runJob("GET_MODEL", { modelName: "station" }));
  75. const stationSchema = (this.stationSchema = await this.cache.runJob("GET_SCHEMA", { schemaName: "station" }));
  76. return new Promise((resolve, reject) =>
  77. async.waterfall(
  78. [
  79. next => {
  80. this.setStage(2);
  81. this.cache
  82. .runJob("HGETALL", { table: "stations" })
  83. .then(stations => {
  84. next(null, stations);
  85. })
  86. .catch(next);
  87. },
  88. (stations, next) => {
  89. this.setStage(3);
  90. if (!stations) return next();
  91. const stationIds = Object.keys(stations);
  92. return async.each(
  93. stationIds,
  94. (stationId, next) => {
  95. stationModel.findOne({ _id: stationId }, (err, station) => {
  96. if (err) next(err);
  97. else if (!station) {
  98. this.cache
  99. .runJob("HDEL", {
  100. table: "stations",
  101. key: stationId
  102. })
  103. .then(() => {
  104. next();
  105. })
  106. .catch(next);
  107. } else next();
  108. });
  109. },
  110. next
  111. );
  112. },
  113. next => {
  114. this.setStage(4);
  115. stationModel.find({}, next);
  116. },
  117. (stations, next) => {
  118. this.setStage(5);
  119. async.each(
  120. stations,
  121. (station, next2) => {
  122. async.waterfall(
  123. [
  124. next => {
  125. this.cache
  126. .runJob("HSET", {
  127. table: "stations",
  128. key: station._id,
  129. value: stationSchema(station)
  130. })
  131. .then(station => next(null, station))
  132. .catch(next);
  133. },
  134. (station, next) => {
  135. this.runJob(
  136. "INITIALIZE_STATION",
  137. {
  138. stationId: station._id,
  139. bypassQueue: true
  140. },
  141. { bypassQueue: true }
  142. )
  143. .then(() => {
  144. next();
  145. })
  146. .catch(next); // bypassQueue is true because otherwise the module will never initialize
  147. }
  148. ],
  149. err => {
  150. next2(err);
  151. }
  152. );
  153. },
  154. next
  155. );
  156. }
  157. ],
  158. async err => {
  159. if (err) {
  160. err = await this.utils.runJob("GET_ERROR", {
  161. error: err
  162. });
  163. reject(new Error(err));
  164. } else {
  165. resolve();
  166. }
  167. }
  168. )
  169. );
  170. }
  171. /**
  172. * Initialises a station
  173. *
  174. * @param {object} payload - object that contains the payload
  175. * @param {string} payload.stationId - id of the station to initialise
  176. * @param {boolean} payload.bypassQueue - UNKNOWN
  177. * @returns {Promise} - returns a promise (resolve, reject)
  178. */
  179. INITIALIZE_STATION(payload) {
  180. return new Promise((resolve, reject) => {
  181. // if (typeof cb !== 'function') cb = ()=>{};
  182. async.waterfall(
  183. [
  184. next => {
  185. this.runJob(
  186. "GET_STATION",
  187. {
  188. stationId: payload.stationId,
  189. bypassQueue: payload.bypassQueue
  190. },
  191. { bypassQueue: payload.bypassQueue }
  192. )
  193. .then(station => {
  194. next(null, station);
  195. })
  196. .catch(next);
  197. },
  198. (station, next) => {
  199. if (!station) return next("Station not found.");
  200. this.notifications
  201. .runJob("UNSCHEDULE", {
  202. name: `stations.nextSong?id=${station._id}`
  203. })
  204. .then()
  205. .catch();
  206. this.notifications
  207. .runJob("SUBSCRIBE", {
  208. name: `stations.nextSong?id=${station._id}`,
  209. cb: () =>
  210. this.runJob("SKIP_STATION", {
  211. stationId: station._id
  212. }),
  213. unique: true,
  214. station
  215. })
  216. .then()
  217. .catch();
  218. if (station.paused) return next(true, station);
  219. return next(null, station);
  220. },
  221. (station, next) => {
  222. if (!station.currentSong) {
  223. return this.runJob(
  224. "SKIP_STATION",
  225. {
  226. stationId: station._id,
  227. bypassQueue: payload.bypassQueue
  228. },
  229. { bypassQueue: payload.bypassQueue }
  230. )
  231. .then(station => {
  232. next(true, station);
  233. })
  234. .catch(next)
  235. .finally(() => {});
  236. }
  237. let timeLeft =
  238. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  239. if (Number.isNaN(timeLeft)) timeLeft = -1;
  240. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  241. return this.runJob(
  242. "SKIP_STATION",
  243. {
  244. stationId: station._id,
  245. bypassQueue: payload.bypassQueue
  246. },
  247. { bypassQueue: payload.bypassQueue }
  248. )
  249. .then(station => {
  250. next(null, station);
  251. })
  252. .catch(next);
  253. }
  254. // name, time, cb, station
  255. this.notifications.runJob("SCHEDULE", {
  256. name: `stations.nextSong?id=${station._id}`,
  257. time: timeLeft,
  258. station
  259. });
  260. return next(null, station);
  261. }
  262. ],
  263. async (err, station) => {
  264. if (err && err !== true) {
  265. err = await this.utils.runJob("GET_ERROR", {
  266. error: err
  267. });
  268. reject(new Error(err));
  269. } else resolve(station);
  270. }
  271. );
  272. });
  273. }
  274. /**
  275. * Calculates the next song for the station
  276. *
  277. * @param {object} payload - object that contains the payload
  278. * @param {string} payload.station - station object to calculate song for
  279. * @param {boolean} payload.bypassQueue - UNKNOWN
  280. * @returns {Promise} - returns a promise (resolve, reject)
  281. */
  282. async CALCULATE_SONG_FOR_STATION(payload) {
  283. // station, bypassValidate = false
  284. const stationModel = await this.db.runJob("GET_MODEL", { modelName: "station" });
  285. const songModel = await this.db.runJob("GET_MODEL", { modelName: "song" });
  286. return new Promise((resolve, reject) => {
  287. const songList = [];
  288. return async.waterfall(
  289. [
  290. next => {
  291. if (payload.station.genres.length === 0) return next();
  292. const genresDone = [];
  293. return payload.station.genres.forEach(genre => {
  294. songModel.find({ genres: genre }, (err, songs) => {
  295. if (!err) {
  296. songs.forEach(song => {
  297. if (songList.indexOf(song._id) === -1) {
  298. let found = false;
  299. song.genres.forEach(songGenre => {
  300. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  301. found = true;
  302. });
  303. if (!found) {
  304. songList.push(song._id);
  305. }
  306. }
  307. });
  308. }
  309. genresDone.push(genre);
  310. if (genresDone.length === payload.station.genres.length) next();
  311. });
  312. });
  313. },
  314. next => {
  315. const playlist = [];
  316. songList.forEach(songId => {
  317. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  318. });
  319. // eslint-disable-next-line array-callback-return
  320. payload.station.playlist.filter(songId => {
  321. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  322. });
  323. this.utils
  324. .runJob("SHUFFLE", { array: playlist })
  325. .then(result => {
  326. next(null, result.array);
  327. })
  328. .catch(next);
  329. },
  330. (playlist, next) => {
  331. this.runJob(
  332. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  333. {
  334. stationId: payload.station._id,
  335. songList: playlist,
  336. bypassQueue: payload.bypassQueue
  337. },
  338. { bypassQueue: payload.bypassQueue }
  339. )
  340. .then(() => {
  341. next(null, playlist);
  342. })
  343. .catch(next);
  344. },
  345. (playlist, next) => {
  346. stationModel.updateOne(
  347. { _id: payload.station._id },
  348. { $set: { playlist } },
  349. { runValidators: true },
  350. () => {
  351. this.runJob(
  352. "UPDATE_STATION",
  353. {
  354. stationId: payload.station._id,
  355. bypassQueue: payload.bypassQueue
  356. },
  357. { bypassQueue: payload.bypassQueue }
  358. )
  359. .then(() => {
  360. next(null, playlist);
  361. })
  362. .catch(next);
  363. }
  364. );
  365. }
  366. ],
  367. (err, newPlaylist) => {
  368. if (err) return reject(new Error(err));
  369. return resolve(newPlaylist);
  370. }
  371. );
  372. });
  373. }
  374. /**
  375. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  376. *
  377. * @param {object} payload - object that contains the payload
  378. * @param {string} payload.stationId - id of the station
  379. * @returns {Promise} - returns a promise (resolve, reject)
  380. */
  381. GET_STATION(payload) {
  382. return new Promise((resolve, reject) => {
  383. async.waterfall(
  384. [
  385. next => {
  386. this.cache
  387. .runJob("HGET", {
  388. table: "stations",
  389. key: payload.stationId
  390. })
  391. .then(station => {
  392. next(null, station);
  393. })
  394. .catch(next);
  395. },
  396. (station, next) => {
  397. if (station) return next(true, station);
  398. return this.stationModel.findOne({ _id: payload.stationId }, next);
  399. },
  400. (station, next) => {
  401. if (station) {
  402. if (station.type === "official") {
  403. this.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  404. stationId: station._id,
  405. songList: station.playlist
  406. })
  407. .then()
  408. .catch();
  409. }
  410. station = this.stationSchema(station);
  411. this.cache
  412. .runJob("HSET", {
  413. table: "stations",
  414. key: payload.stationId,
  415. value: station
  416. })
  417. .then()
  418. .catch();
  419. next(true, station);
  420. } else next("Station not found");
  421. }
  422. ],
  423. async (err, station) => {
  424. if (err && err !== true) {
  425. err = await this.utils.runJob("GET_ERROR", {
  426. error: err
  427. });
  428. reject(new Error(err));
  429. } else resolve(station);
  430. }
  431. );
  432. });
  433. }
  434. /**
  435. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  436. *
  437. * @param {object} payload - object that contains the payload
  438. * @param {string} payload.stationName - the unique name of the station
  439. * @returns {Promise} - returns a promise (resolve, reject)
  440. */
  441. async GET_STATION_BY_NAME(payload) {
  442. const stationModel = await this.db.runJob("GET_MODEL", { modelName: "station" });
  443. return new Promise((resolve, reject) =>
  444. async.waterfall(
  445. [
  446. next => {
  447. stationModel.findOne({ name: payload.stationName }, next);
  448. },
  449. (station, next) => {
  450. if (station) {
  451. if (station.type === "official") {
  452. this.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  453. stationId: station._id,
  454. songList: station.playlist
  455. });
  456. }
  457. this.cache.runJob("GET_SCHEMA", { schemaName: "station" }).then(stationSchema => {
  458. station = stationSchema(station);
  459. this.cache.runJob("HSET", {
  460. table: "stations",
  461. key: station._id,
  462. value: station
  463. });
  464. next(true, station);
  465. });
  466. } else next("Station not found");
  467. }
  468. ],
  469. (err, station) => {
  470. if (err && err !== true) return reject(new Error(err));
  471. return resolve(station);
  472. }
  473. )
  474. );
  475. }
  476. /**
  477. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  478. *
  479. * @param {object} payload - object that contains the payload
  480. * @param {string} payload.stationId - the id of the station to update
  481. * @returns {Promise} - returns a promise (resolve, reject)
  482. */
  483. UPDATE_STATION(payload) {
  484. return new Promise((resolve, reject) => {
  485. async.waterfall(
  486. [
  487. next => {
  488. this.stationModel.findOne({ _id: payload.stationId }, next);
  489. },
  490. (station, next) => {
  491. if (!station) {
  492. this.cache
  493. .runJob("HDEL", {
  494. table: "stations",
  495. key: payload.stationId
  496. })
  497. .then()
  498. .catch();
  499. return next("Station not found");
  500. }
  501. return this.cache
  502. .runJob("HSET", {
  503. table: "stations",
  504. key: payload.stationId,
  505. value: station
  506. })
  507. .then(station => {
  508. next(null, station);
  509. })
  510. .catch(next);
  511. }
  512. ],
  513. async (err, station) => {
  514. if (err && err !== true) {
  515. err = await this.utils.runJob("GET_ERROR", {
  516. error: err
  517. });
  518. reject(new Error(err));
  519. } else resolve(station);
  520. }
  521. );
  522. });
  523. }
  524. /**
  525. * Creates the official playlist for a station
  526. *
  527. * @param {object} payload - object that contains the payload
  528. * @param {string} payload.stationId - the id of the station
  529. * @param {Array} payload.songList - list of songs to put in official playlist
  530. * @returns {Promise} - returns a promise (resolve, reject)
  531. */
  532. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  533. const officialPlaylistSchema = await this.cache.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" });
  534. console.log(typeof payload.songList, payload.songList);
  535. return new Promise(resolve => {
  536. const lessInfoPlaylist = [];
  537. return async.each(
  538. payload.songList,
  539. (song, next) => {
  540. this.songs
  541. .runJob("GET_SONG", { id: song })
  542. .then(response => {
  543. const { song } = response;
  544. if (song) {
  545. const newSong = {
  546. songId: song.songId,
  547. title: song.title,
  548. artists: song.artists,
  549. duration: song.duration
  550. };
  551. lessInfoPlaylist.push(newSong);
  552. }
  553. })
  554. .finally(() => {
  555. next();
  556. });
  557. },
  558. () => {
  559. this.cache
  560. .runJob("HSET", {
  561. table: "officialPlaylists",
  562. key: payload.stationId,
  563. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  564. })
  565. .finally(() => {
  566. this.cache.runJob("PUB", {
  567. channel: "station.newOfficialPlaylist",
  568. value: payload.stationId
  569. });
  570. resolve();
  571. });
  572. }
  573. );
  574. });
  575. }
  576. /**
  577. * Skips a station
  578. *
  579. * @param {object} payload - object that contains the payload
  580. * @param {string} payload.stationId - the id of the station to skip
  581. * @param {boolean} payload.bypassQueue - UNKNOWN
  582. * @returns {Promise} - returns a promise (resolve, reject)
  583. */
  584. SKIP_STATION(payload) {
  585. return new Promise((resolve, reject) => {
  586. this.log("INFO", `Skipping station ${payload.stationId}.`);
  587. this.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  588. async.waterfall(
  589. [
  590. next => {
  591. this.runJob(
  592. "GET_STATION",
  593. {
  594. stationId: payload.stationId,
  595. bypassQueue: payload.bypassQueue
  596. },
  597. { bypassQueue: payload.bypassQueue }
  598. )
  599. .then(station => {
  600. next(null, station);
  601. })
  602. .catch(() => {});
  603. },
  604. // eslint-disable-next-line consistent-return
  605. (station, next) => {
  606. if (!station) return next("Station not found.");
  607. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  608. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  609. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  610. // Community station with party mode enabled and songs in the queue
  611. if (station.paused) return next(null, null, -19, station);
  612. return this.stationModel.updateOne(
  613. { _id: payload.stationId },
  614. {
  615. $pull: {
  616. queue: {
  617. _id: station.queue[0]._id
  618. }
  619. }
  620. },
  621. err => {
  622. if (err) return next(err);
  623. return next(null, station.queue[0], -12, station);
  624. }
  625. );
  626. }
  627. if (station.type === "community" && !station.partyMode) {
  628. return this.db.runJob("GET_MODEL", { modelName: "playlist" }).then(playlistModel =>
  629. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  630. if (err) return next(err);
  631. if (!playlist) return next(null, null, -13, station);
  632. playlist = playlist.songs;
  633. if (playlist.length > 0) {
  634. let currentSongIndex;
  635. if (station.currentSongIndex < playlist.length - 1)
  636. currentSongIndex = station.currentSongIndex + 1;
  637. else currentSongIndex = 0;
  638. const callback = (err, song) => {
  639. if (err) return next(err);
  640. if (song) return next(null, song, currentSongIndex, station);
  641. const currentSong = {
  642. songId: playlist[currentSongIndex].songId,
  643. title: playlist[currentSongIndex].title,
  644. duration: playlist[currentSongIndex].duration,
  645. likes: -1,
  646. dislikes: -1
  647. };
  648. return next(null, currentSong, currentSongIndex, station);
  649. };
  650. if (playlist[currentSongIndex]._id)
  651. return this.songs
  652. .runJob("GET_SONG", {
  653. id: playlist[currentSongIndex]._id
  654. })
  655. .then(response => callback(null, response.song))
  656. .catch(callback);
  657. return this.songs
  658. .runJob("GET_SONG_FROM_ID", {
  659. songId: playlist[currentSongIndex].songId
  660. })
  661. .then(response => callback(null, response.song))
  662. .catch(callback);
  663. }
  664. return next(null, null, -14, station);
  665. })
  666. );
  667. }
  668. if (station.type === "official" && station.playlist.length === 0) {
  669. return this.runJob(
  670. "CALCULATE_SONG_FOR_STATION",
  671. { station, bypassQueue: payload.bypassQueue },
  672. { bypassQueue: payload.bypassQueue }
  673. )
  674. .then(playlist => {
  675. if (playlist.length === 0) return next(null, this.defaultSong, 0, station);
  676. return this.songs
  677. .runJob("GET_SONG", {
  678. id: playlist[0]
  679. })
  680. .then(response => {
  681. next(null, response.song, 0, station);
  682. })
  683. .catch(() => next(null, this.defaultSong, 0, station));
  684. })
  685. .catch(next);
  686. }
  687. if (station.type === "official" && station.playlist.length > 0) {
  688. return async.doUntil(
  689. next => {
  690. if (station.currentSongIndex < station.playlist.length - 1) {
  691. this.songs
  692. .runJob("GET_SONG", {
  693. id: station.playlist[station.currentSongIndex + 1]
  694. })
  695. .then(response => next(null, response.song, station.currentSongIndex + 1))
  696. .catch(() => {
  697. station.currentSongIndex += 1;
  698. next(null, null, null);
  699. });
  700. } else {
  701. this.runJob(
  702. "CALCULATE_SONG_FOR_STATION",
  703. {
  704. station,
  705. bypassQueue: payload.bypassQueue
  706. },
  707. { bypassQueue: payload.bypassQueue }
  708. )
  709. .then(newPlaylist => {
  710. this.songs
  711. .runJob("GET_SONG", { id: newPlaylist[0] })
  712. .then(response => {
  713. station.playlist = newPlaylist;
  714. next(null, response.song, 0);
  715. })
  716. .catch(() => next(null, this.defaultSong, 0));
  717. })
  718. .catch(() => {
  719. next(null, this.defaultSong, 0);
  720. });
  721. }
  722. },
  723. (song, currentSongIndex, next) => {
  724. if (song) return next(null, true, currentSongIndex);
  725. return next(null, false);
  726. },
  727. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  728. );
  729. }
  730. },
  731. (song, currentSongIndex, station, next) => {
  732. const $set = {};
  733. if (song === null) $set.currentSong = null;
  734. else if (song.likes === -1 && song.dislikes === -1) {
  735. $set.currentSong = {
  736. songId: song.songId,
  737. title: song.title,
  738. duration: song.duration,
  739. skipDuration: 0,
  740. likes: -1,
  741. dislikes: -1
  742. };
  743. } else {
  744. $set.currentSong = {
  745. songId: song.songId,
  746. title: song.title,
  747. artists: song.artists,
  748. duration: song.duration,
  749. likes: song.likes,
  750. dislikes: song.dislikes,
  751. skipDuration: song.skipDuration,
  752. thumbnail: song.thumbnail
  753. };
  754. }
  755. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  756. $set.startedAt = Date.now();
  757. $set.timePaused = 0;
  758. if (station.paused) $set.pausedAt = Date.now();
  759. next(null, $set, station);
  760. },
  761. ($set, station, next) => {
  762. this.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  763. this.runJob(
  764. "UPDATE_STATION",
  765. {
  766. stationId: station._id,
  767. bypassQueue: payload.bypassQueue
  768. },
  769. { bypassQueue: payload.bypassQueue }
  770. )
  771. .then(station => {
  772. if (station.type === "community" && station.partyMode === true)
  773. this.cache
  774. .runJob("PUB", {
  775. channel: "station.queueUpdate",
  776. value: payload.stationId
  777. })
  778. .then()
  779. .catch();
  780. next(null, station);
  781. })
  782. .catch(next);
  783. });
  784. }
  785. ],
  786. async (err, station) => {
  787. if (err) {
  788. err = await this.utils.runJob("GET_ERROR", {
  789. error: err
  790. });
  791. this.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  792. reject(new Error(err));
  793. } else {
  794. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  795. station.currentSong.skipVotes = 0;
  796. }
  797. // TODO Pub/Sub this
  798. this.utils
  799. .runJob("EMIT_TO_ROOM", {
  800. room: `station.${station._id}`,
  801. args: [
  802. "event:songs.next",
  803. {
  804. currentSong: station.currentSong,
  805. startedAt: station.startedAt,
  806. paused: station.paused,
  807. timePaused: 0
  808. }
  809. ]
  810. })
  811. .then()
  812. .catch();
  813. if (station.privacy === "public") {
  814. this.utils
  815. .runJob("EMIT_TO_ROOM", {
  816. room: "home",
  817. args: ["event:station.nextSong", station._id, station.currentSong]
  818. })
  819. .then()
  820. .catch();
  821. } else {
  822. const sockets = await this.utils.runJob("GET_ROOM_SOCKETS", { room: "home" });
  823. for (
  824. let socketId = 0, socketKeys = Object.keys(sockets);
  825. socketId < socketKeys.length;
  826. socketId += 1
  827. ) {
  828. const socket = sockets[socketId];
  829. const { session } = sockets[socketId];
  830. if (session.sessionId) {
  831. this.cache
  832. .runJob("HGET", {
  833. table: "sessions",
  834. key: session.sessionId
  835. })
  836. .then(session => {
  837. if (session) {
  838. this.db.runJob("GET_MODEL", { modelName: "user" }).then(userModel => {
  839. userModel.findOne(
  840. {
  841. _id: session.userId
  842. },
  843. (err, user) => {
  844. if (!err && user) {
  845. if (user.role === "admin")
  846. socket.emit(
  847. "event:station.nextSong",
  848. station._id,
  849. station.currentSong
  850. );
  851. else if (
  852. station.type === "community" &&
  853. station.owner === session.userId
  854. )
  855. socket.emit(
  856. "event:station.nextSong",
  857. station._id,
  858. station.currentSong
  859. );
  860. }
  861. }
  862. );
  863. });
  864. }
  865. });
  866. }
  867. }
  868. }
  869. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  870. this.utils.runJob("SOCKETS_JOIN_SONG_ROOM", {
  871. sockets: await this.utils.runJob("GET_ROOM_SOCKETS", {
  872. room: `station.${station._id}`
  873. }),
  874. room: `song.${station.currentSong.songId}`
  875. });
  876. if (!station.paused) {
  877. this.notifications.runJob("SCHEDULE", {
  878. name: `stations.nextSong?id=${station._id}`,
  879. time: station.currentSong.duration * 1000,
  880. station
  881. });
  882. }
  883. } else {
  884. this.utils
  885. .runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  886. sockets: await this.utils.runJob("GET_ROOM_SOCKETS", {
  887. room: `station.${station._id}`
  888. })
  889. })
  890. .then()
  891. .catch();
  892. }
  893. resolve({ station });
  894. }
  895. }
  896. );
  897. });
  898. }
  899. /**
  900. * Checks if a user can view/access a station
  901. *
  902. * @param {object} payload - object that contains the payload
  903. * @param {object} payload.station - the station object of the station in question
  904. * @param {string} payload.userId - the id of the user in question
  905. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  906. * @returns {Promise} - returns a promise (resolve, reject)
  907. */
  908. CAN_USER_VIEW_STATION(payload) {
  909. return new Promise((resolve, reject) => {
  910. async.waterfall(
  911. [
  912. next => {
  913. if (payload.station.privacy === "public") return next(true);
  914. if (payload.station.privacy === "unlisted")
  915. if (payload.hideUnlisted === true) return next();
  916. else return next(true);
  917. if (!payload.userId) return next("Not allowed");
  918. return next();
  919. },
  920. next => {
  921. this.db
  922. .runJob("GET_MODEL", {
  923. modelName: "user"
  924. })
  925. .then(userModel => {
  926. userModel.findOne({ _id: payload.userId }, next);
  927. });
  928. },
  929. (user, next) => {
  930. if (!user) return next("Not allowed");
  931. if (user.role === "admin") return next(true);
  932. if (payload.station.type === "official") return next("Not allowed");
  933. if (payload.station.owner === payload.userId) return next(true);
  934. return next("Not allowed");
  935. }
  936. ],
  937. async errOrResult => {
  938. if (errOrResult !== true && errOrResult !== "Not allowed") {
  939. errOrResult = await this.utils.runJob("GET_ERROR", {
  940. error: errOrResult
  941. });
  942. reject(new Error(errOrResult));
  943. } else {
  944. resolve(errOrResult === true);
  945. }
  946. }
  947. );
  948. });
  949. }
  950. }
  951. export default new StationsModule();