stations.js 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198
  1. const CoreClass = require("../core.js");
  2. const async = require("async");
  3. let subscription = null;
  4. class StationsModule extends CoreClass {
  5. constructor() {
  6. super("stations");
  7. }
  8. initialize() {
  9. return new Promise(async (resolve, reject) => {
  10. this.cache = this.moduleManager.modules["cache"];
  11. this.db = this.moduleManager.modules["db"];
  12. this.utils = this.moduleManager.modules["utils"];
  13. this.songs = this.moduleManager.modules["songs"];
  14. this.notifications = this.moduleManager.modules["notifications"];
  15. this.defaultSong = {
  16. songId: "60ItHLz5WEA",
  17. title: "Faded - Alan Walker",
  18. duration: 212,
  19. skipDuration: 0,
  20. likes: -1,
  21. dislikes: -1,
  22. };
  23. //TEMP
  24. this.cache.runJob("SUB", {
  25. channel: "station.pause",
  26. cb: async (stationId) => {
  27. this.notifications
  28. .runJob("REMOVE", {
  29. subscription: `stations.nextSong?id=${stationId}`,
  30. })
  31. .then();
  32. },
  33. });
  34. this.cache.runJob("SUB", {
  35. channel: "station.resume",
  36. cb: async (stationId) => {
  37. this.runJob("INITIALIZE_STATION", { stationId }).then();
  38. },
  39. });
  40. this.cache.runJob("SUB", {
  41. channel: "station.queueUpdate",
  42. cb: async (stationId) => {
  43. this.runJob("GET_STATION", { stationId }).then(
  44. (station) => {
  45. if (
  46. !station.currentSong &&
  47. station.queue.length > 0
  48. ) {
  49. this.runJob("INITIALIZE_STATION", {
  50. stationId,
  51. }).then();
  52. }
  53. }
  54. );
  55. },
  56. });
  57. this.cache.runJob("SUB", {
  58. channel: "station.newOfficialPlaylist",
  59. cb: async (stationId) => {
  60. this.cache
  61. .runJob("HGET", {
  62. table: "officialPlaylists",
  63. key: stationId,
  64. })
  65. .then((playlistObj) => {
  66. if (playlistObj) {
  67. this.utils.runJob("EMIT_TO_ROOM", {
  68. room: `station.${stationId}`,
  69. args: [
  70. "event:newOfficialPlaylist",
  71. playlistObj.songs,
  72. ],
  73. });
  74. }
  75. });
  76. },
  77. });
  78. const stationModel = (this.stationModel = await this.db.runJob(
  79. "GET_MODEL",
  80. {
  81. modelName: "station",
  82. }
  83. ));
  84. const stationSchema = (this.stationSchema = await this.cache.runJob(
  85. "GET_SCHEMA",
  86. {
  87. schemaName: "station",
  88. }
  89. ));
  90. async.waterfall(
  91. [
  92. (next) => {
  93. this.setStage(2);
  94. this.cache
  95. .runJob("HGETALL", { table: "stations" })
  96. .then((stations) => {
  97. next(null, stations);
  98. })
  99. .catch(next);
  100. },
  101. (stations, next) => {
  102. this.setStage(3);
  103. if (!stations) return next();
  104. let stationIds = Object.keys(stations);
  105. async.each(
  106. stationIds,
  107. (stationId, next) => {
  108. stationModel.findOne(
  109. { _id: stationId },
  110. (err, station) => {
  111. if (err) next(err);
  112. else if (!station) {
  113. this.cache
  114. .runJob("HDEL", {
  115. table: "stations",
  116. key: stationId,
  117. })
  118. .then(() => {
  119. next();
  120. })
  121. .catch(next);
  122. } else next();
  123. }
  124. );
  125. },
  126. next
  127. );
  128. },
  129. (next) => {
  130. this.setStage(4);
  131. stationModel.find({}, next);
  132. },
  133. (stations, next) => {
  134. this.setStage(5);
  135. async.each(
  136. stations,
  137. (station, next2) => {
  138. async.waterfall(
  139. [
  140. (next) => {
  141. this.cache
  142. .runJob("HSET", {
  143. table: "stations",
  144. key: station._id,
  145. value: stationSchema(
  146. station
  147. ),
  148. })
  149. .then((station) =>
  150. next(null, station)
  151. )
  152. .catch(next);
  153. },
  154. (station, next) => {
  155. this.runJob(
  156. "INITIALIZE_STATION",
  157. {
  158. stationId: station._id,
  159. bypassQueue: true,
  160. },
  161. { bypassQueue: true }
  162. )
  163. .then(() => {
  164. next();
  165. })
  166. .catch(next); // bypassQueue is true because otherwise the module will never initialize
  167. },
  168. ],
  169. (err) => {
  170. next2(err);
  171. }
  172. );
  173. },
  174. next
  175. );
  176. },
  177. ],
  178. async (err) => {
  179. if (err) {
  180. err = await this.utils.runJob("GET_ERROR", {
  181. error: err,
  182. });
  183. reject(new Error(err));
  184. } else {
  185. resolve();
  186. }
  187. }
  188. );
  189. });
  190. }
  191. INITIALIZE_STATION(payload) {
  192. //stationId, cb, bypassValidate = false
  193. return new Promise((resolve, reject) => {
  194. // if (typeof cb !== 'function') cb = ()=>{};
  195. async.waterfall(
  196. [
  197. (next) => {
  198. this.runJob(
  199. "GET_STATION",
  200. {
  201. stationId: payload.stationId,
  202. bypassQueue: payload.bypassQueue,
  203. },
  204. { bypassQueue: payload.bypassQueue }
  205. )
  206. .then((station) => {
  207. next(null, station);
  208. })
  209. .catch(next);
  210. },
  211. (station, next) => {
  212. if (!station) return next("Station not found.");
  213. this.notifications
  214. .runJob("UNSCHEDULE", {
  215. name: `stations.nextSong?id=${station._id}`,
  216. })
  217. .then()
  218. .catch();
  219. this.notifications
  220. .runJob("SUBSCRIBE", {
  221. name: `stations.nextSong?id=${station._id}`,
  222. cb: () =>
  223. this.runJob("SKIP_STATION", {
  224. stationId: station._id,
  225. }),
  226. unique: true,
  227. station,
  228. })
  229. .then()
  230. .catch();
  231. if (station.paused) return next(true, station);
  232. next(null, station);
  233. },
  234. (station, next) => {
  235. if (!station.currentSong) {
  236. return this.runJob(
  237. "SKIP_STATION",
  238. {
  239. stationId: station._id,
  240. bypassQueue: payload.bypassQueue,
  241. },
  242. { bypassQueue: payload.bypassQueue }
  243. )
  244. .then((station) => {
  245. next(true, station);
  246. })
  247. .catch(next)
  248. .finally(() => {});
  249. }
  250. let timeLeft =
  251. station.currentSong.duration * 1000 -
  252. (Date.now() -
  253. station.startedAt -
  254. station.timePaused);
  255. if (isNaN(timeLeft)) timeLeft = -1;
  256. if (
  257. station.currentSong.duration * 1000 < timeLeft ||
  258. timeLeft < 0
  259. ) {
  260. this.runJob(
  261. "SKIP_STATION",
  262. {
  263. stationId: station._id,
  264. bypassQueue: payload.bypassQueue,
  265. },
  266. { bypassQueue: payload.bypassQueue }
  267. )
  268. .then((station) => {
  269. next(null, station);
  270. })
  271. .catch(next);
  272. } else {
  273. //name, time, cb, station
  274. this.notifications.runJob("SCHEDULE", {
  275. name: `stations.nextSong?id=${station._id}`,
  276. time: timeLeft,
  277. station,
  278. });
  279. next(null, station);
  280. }
  281. },
  282. ],
  283. async (err, station) => {
  284. if (err && err !== true) {
  285. err = await this.utils.runJob("GET_ERROR", {
  286. error: err,
  287. });
  288. reject(new Error(err));
  289. } else resolve(station);
  290. }
  291. );
  292. });
  293. }
  294. CALCULATE_SONG_FOR_STATION(payload) {
  295. //station, cb, bypassValidate = false
  296. return new Promise(async (resolve, reject) => {
  297. const songModel = await this.db.runJob("GET_MODEL", {
  298. modelName: "song",
  299. });
  300. const stationModel = await this.db.runJob("GET_MODEL", {
  301. modelName: "station",
  302. });
  303. let songList = [];
  304. async.waterfall(
  305. [
  306. (next) => {
  307. if (payload.station.genres.length === 0) return next();
  308. let genresDone = [];
  309. payload.station.genres.forEach((genre) => {
  310. songModel.find({ genres: genre }, (err, songs) => {
  311. if (!err) {
  312. songs.forEach((song) => {
  313. if (songList.indexOf(song._id) === -1) {
  314. let found = false;
  315. song.genres.forEach((songGenre) => {
  316. if (
  317. payload.station.blacklistedGenres.indexOf(
  318. songGenre
  319. ) !== -1
  320. )
  321. found = true;
  322. });
  323. if (!found) {
  324. songList.push(song._id);
  325. }
  326. }
  327. });
  328. }
  329. genresDone.push(genre);
  330. if (
  331. genresDone.length ===
  332. payload.station.genres.length
  333. )
  334. next();
  335. });
  336. });
  337. },
  338. (next) => {
  339. let playlist = [];
  340. songList.forEach(function(songId) {
  341. if (payload.station.playlist.indexOf(songId) === -1)
  342. playlist.push(songId);
  343. });
  344. payload.station.playlist.filter((songId) => {
  345. if (songList.indexOf(songId) !== -1)
  346. playlist.push(songId);
  347. });
  348. this.utils
  349. .runJob("SHUFFLE", { array: playlist })
  350. .then((result) => {
  351. next(null, result.array);
  352. })
  353. .catch(next);
  354. },
  355. (playlist, next) => {
  356. this.runJob(
  357. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  358. {
  359. stationId: payload.station._id,
  360. songList: playlist,
  361. bypassQueue: payload.bypassQueue,
  362. },
  363. { bypassQueue: payload.bypassQueue }
  364. )
  365. .then(() => {
  366. next(null, playlist);
  367. })
  368. .catch(next);
  369. },
  370. (playlist, next) => {
  371. stationModel.updateOne(
  372. { _id: payload.station._id },
  373. { $set: { playlist: playlist } },
  374. { runValidators: true },
  375. (err) => {
  376. this.runJob(
  377. "UPDATE_STATION",
  378. {
  379. stationId: payload.station._id,
  380. bypassQueue: payload.bypassQueue,
  381. },
  382. { bypassQueue: payload.bypassQueue }
  383. )
  384. .then(() => {
  385. next(null, playlist);
  386. })
  387. .catch(next);
  388. }
  389. );
  390. },
  391. ],
  392. (err, newPlaylist) => {
  393. if (err) return reject(new Error(err));
  394. resolve(newPlaylist);
  395. }
  396. );
  397. });
  398. }
  399. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  400. GET_STATION(payload) {
  401. //stationId, cb, bypassValidate = false
  402. return new Promise((resolve, reject) => {
  403. async.waterfall(
  404. [
  405. (next) => {
  406. this.cache
  407. .runJob("HGET", {
  408. table: "stations",
  409. key: payload.stationId,
  410. })
  411. .then((station) => {
  412. next(null, station);
  413. })
  414. .catch(next);
  415. },
  416. (station, next) => {
  417. if (station) return next(true, station);
  418. this.stationModel.findOne(
  419. { _id: payload.stationId },
  420. next
  421. );
  422. },
  423. (station, next) => {
  424. if (station) {
  425. if (station.type === "official") {
  426. this.runJob(
  427. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  428. {
  429. stationId: station._id,
  430. songList: station.playlist,
  431. }
  432. )
  433. .then()
  434. .catch();
  435. }
  436. station = this.stationSchema(station);
  437. this.cache
  438. .runJob("HSET", {
  439. table: "stations",
  440. key: payload.stationId,
  441. value: station,
  442. })
  443. .then()
  444. .catch();
  445. next(true, station);
  446. } else next("Station not found");
  447. },
  448. ],
  449. async (err, station) => {
  450. if (err && err !== true) {
  451. err = await this.utils.runJob("GET_ERROR", {
  452. error: err,
  453. });
  454. reject(new Error(err));
  455. } else resolve(station);
  456. }
  457. );
  458. });
  459. }
  460. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  461. GET_STATION_BY_NAME(payload) {
  462. //stationName, cb
  463. return new Promise(async (resolve, reject) => {
  464. const stationModel = await this.db.runJob("GET_MODEL", {
  465. modelName: "station",
  466. });
  467. async.waterfall(
  468. [
  469. (next) => {
  470. stationModel.findOne(
  471. { name: payload.stationName },
  472. next
  473. );
  474. },
  475. (station, next) => {
  476. if (station) {
  477. if (station.type === "official") {
  478. this.runJob(
  479. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  480. {
  481. stationId: station._id,
  482. songList: station.playlist,
  483. }
  484. );
  485. }
  486. this.cache
  487. .runJob("GET_SCHEMA", { schemaName: "station" })
  488. .then((stationSchema) => {
  489. station = stationSchema(station);
  490. this.cache.runJob("HSET", {
  491. table: "stations",
  492. key: station._id,
  493. value: station,
  494. });
  495. next(true, station);
  496. });
  497. } else next("Station not found");
  498. },
  499. ],
  500. (err, station) => {
  501. if (err && err !== true) return reject(new Error(err));
  502. resolve(station);
  503. }
  504. );
  505. });
  506. }
  507. UPDATE_STATION(payload) {
  508. //stationId, cb, bypassValidate = false
  509. return new Promise((resolve, reject) => {
  510. async.waterfall(
  511. [
  512. (next) => {
  513. this.stationModel.findOne(
  514. { _id: payload.stationId },
  515. next
  516. );
  517. },
  518. (station, next) => {
  519. if (!station) {
  520. this.cache
  521. .runJob("HDEL", {
  522. table: "stations",
  523. key: payload.stationId,
  524. })
  525. .then()
  526. .catch();
  527. return next("Station not found");
  528. }
  529. this.cache
  530. .runJob("HSET", {
  531. table: "stations",
  532. key: payload.stationId,
  533. value: station,
  534. })
  535. .then((station) => {
  536. next(null, station);
  537. })
  538. .catch(next);
  539. },
  540. ],
  541. async (err, station) => {
  542. if (err && err !== true) {
  543. err = await this.utils.runJob("GET_ERROR", {
  544. error: err,
  545. });
  546. reject(new Error(err));
  547. } else resolve(station);
  548. }
  549. );
  550. });
  551. }
  552. CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  553. //stationId, songList, cb, bypassValidate = false
  554. return new Promise(async (resolve, reject) => {
  555. const officialPlaylistSchema = await this.cache.runJob(
  556. "GET_SCHEMA",
  557. {
  558. schemaName: "officialPlaylist",
  559. }
  560. );
  561. let lessInfoPlaylist = [];
  562. async.each(
  563. payload.songList,
  564. (song, next) => {
  565. this.songs
  566. .runJob("GET_SONG", { id: song })
  567. .then((response) => {
  568. const song = response.song;
  569. if (song) {
  570. let newSong = {
  571. songId: song.songId,
  572. title: song.title,
  573. artists: song.artists,
  574. duration: song.duration,
  575. };
  576. lessInfoPlaylist.push(newSong);
  577. }
  578. })
  579. .finally(() => {
  580. next();
  581. });
  582. },
  583. () => {
  584. this.cache
  585. .runJob("HSET", {
  586. table: "officialPlaylists",
  587. key: payload.stationId,
  588. value: officialPlaylistSchema(
  589. payload.stationId,
  590. lessInfoPlaylist
  591. ),
  592. })
  593. .finally(() => {
  594. this.cache.runJob("PUB", {
  595. channel: "station.newOfficialPlaylist",
  596. value: payload.stationId,
  597. });
  598. resolve();
  599. });
  600. }
  601. );
  602. });
  603. }
  604. SKIP_STATION(payload) {
  605. //stationId
  606. return new Promise((resolve, reject) => {
  607. this.log("INFO", `Skipping station ${payload.stationId}.`);
  608. this.log(
  609. "STATION_ISSUE",
  610. `SKIP_STATION_CB - Station ID: ${payload.stationId}.`
  611. );
  612. async.waterfall(
  613. [
  614. (next) => {
  615. this.runJob(
  616. "GET_STATION",
  617. {
  618. stationId: payload.stationId,
  619. bypassQueue: payload.bypassQueue,
  620. },
  621. { bypassQueue: payload.bypassQueue }
  622. )
  623. .then((station) => {
  624. next(null, station);
  625. })
  626. .catch(() => {});
  627. },
  628. (station, next) => {
  629. if (!station) return next("Station not found.");
  630. if (
  631. station.type === "community" &&
  632. station.partyMode &&
  633. station.queue.length === 0
  634. )
  635. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  636. if (
  637. station.type === "community" &&
  638. station.partyMode &&
  639. station.queue.length > 0
  640. ) {
  641. // Community station with party mode enabled and songs in the queue
  642. if (station.paused) {
  643. return next(null, null, -19, station);
  644. } else {
  645. return this.stationModel.updateOne(
  646. { _id: payload.stationId },
  647. {
  648. $pull: {
  649. queue: {
  650. _id: station.queue[0]._id,
  651. },
  652. },
  653. },
  654. (err) => {
  655. if (err) return next(err);
  656. next(
  657. null,
  658. station.queue[0],
  659. -12,
  660. station
  661. );
  662. }
  663. );
  664. }
  665. }
  666. if (
  667. station.type === "community" &&
  668. !station.partyMode
  669. ) {
  670. this.db
  671. .runJob("GET_MODEL", { modelName: "playlist" })
  672. .then((playlistModel) => {
  673. return playlistModel.findOne(
  674. { _id: station.privatePlaylist },
  675. (err, playlist) => {
  676. if (err) return next(err);
  677. if (!playlist)
  678. return next(
  679. null,
  680. null,
  681. -13,
  682. station
  683. );
  684. playlist = playlist.songs;
  685. if (playlist.length > 0) {
  686. let currentSongIndex;
  687. if (
  688. station.currentSongIndex <
  689. playlist.length - 1
  690. )
  691. currentSongIndex =
  692. station.currentSongIndex +
  693. 1;
  694. else currentSongIndex = 0;
  695. let callback = (err, song) => {
  696. if (err) return next(err);
  697. if (song)
  698. return next(
  699. null,
  700. song,
  701. currentSongIndex,
  702. station
  703. );
  704. else {
  705. let song =
  706. playlist[
  707. currentSongIndex
  708. ];
  709. let currentSong = {
  710. songId: song.songId,
  711. title: song.title,
  712. duration:
  713. song.duration,
  714. likes: -1,
  715. dislikes: -1,
  716. };
  717. return next(
  718. null,
  719. currentSong,
  720. currentSongIndex,
  721. station
  722. );
  723. }
  724. };
  725. if (
  726. playlist[currentSongIndex]
  727. ._id
  728. )
  729. this.songs
  730. .runJob("GET_SONG", {
  731. id:
  732. playlist[
  733. currentSongIndex
  734. ]._id,
  735. })
  736. .then((response) =>
  737. callback(
  738. null,
  739. response.song
  740. )
  741. )
  742. .catch(callback);
  743. else
  744. this.songs
  745. .runJob(
  746. "GET_SONG_FROM_ID",
  747. {
  748. songId:
  749. playlist[
  750. currentSongIndex
  751. ].songId,
  752. }
  753. )
  754. .then((response) =>
  755. callback(
  756. null,
  757. response.song
  758. )
  759. )
  760. .catch(callback);
  761. } else
  762. return next(
  763. null,
  764. null,
  765. -14,
  766. station
  767. );
  768. }
  769. );
  770. });
  771. }
  772. if (
  773. station.type === "official" &&
  774. station.playlist.length === 0
  775. ) {
  776. return this.runJob(
  777. "CALCULATE_SONG_FOR_STATION",
  778. { station, bypassQueue: payload.bypassQueue },
  779. { bypassQueue: payload.bypassQueue }
  780. )
  781. .then((playlist) => {
  782. if (playlist.length === 0)
  783. return next(
  784. null,
  785. this.defaultSong,
  786. 0,
  787. station
  788. );
  789. else {
  790. this.songs
  791. .runJob("GET_SONG", {
  792. id: playlist[0],
  793. })
  794. .then((response) => {
  795. next(
  796. null,
  797. response.song,
  798. 0,
  799. station
  800. );
  801. })
  802. .catch((err) => {
  803. return next(
  804. null,
  805. this.defaultSong,
  806. 0,
  807. station
  808. );
  809. });
  810. }
  811. })
  812. .catch(next);
  813. }
  814. if (
  815. station.type === "official" &&
  816. station.playlist.length > 0
  817. ) {
  818. async.doUntil(
  819. (next) => {
  820. if (
  821. station.currentSongIndex <
  822. station.playlist.length - 1
  823. ) {
  824. this.songs
  825. .runJob("GET_SONG", {
  826. id:
  827. station.playlist[
  828. station.currentSongIndex +
  829. 1
  830. ],
  831. })
  832. .then((response) => {
  833. return next(
  834. null,
  835. response.song,
  836. station.currentSongIndex + 1
  837. );
  838. })
  839. .catch((err) => {
  840. station.currentSongIndex++;
  841. next(null, null, null);
  842. });
  843. } else {
  844. this.runJob(
  845. "CALCULATE_SONG_FOR_STATION",
  846. {
  847. station,
  848. bypassQueue:
  849. payload.bypassQueue,
  850. },
  851. { bypassQueue: payload.bypassQueue }
  852. )
  853. .then((newPlaylist) => {
  854. this.songs.getSong(
  855. newPlaylist[0],
  856. (err, song) => {
  857. if (err || !song)
  858. return next(
  859. null,
  860. this
  861. .defaultSong,
  862. 0
  863. );
  864. station.playlist = newPlaylist;
  865. next(null, song, 0);
  866. }
  867. );
  868. })
  869. .catch((err) => {
  870. next(null, this.defaultSong, 0);
  871. });
  872. }
  873. },
  874. (song, currentSongIndex, next) => {
  875. if (!!song)
  876. return next(
  877. null,
  878. true,
  879. currentSongIndex
  880. );
  881. else return next(null, false);
  882. },
  883. (err, song, currentSongIndex) => {
  884. return next(
  885. err,
  886. song,
  887. currentSongIndex,
  888. station
  889. );
  890. }
  891. );
  892. }
  893. },
  894. (song, currentSongIndex, station, next) => {
  895. let $set = {};
  896. if (song === null) $set.currentSong = null;
  897. else if (song.likes === -1 && song.dislikes === -1) {
  898. $set.currentSong = {
  899. songId: song.songId,
  900. title: song.title,
  901. duration: song.duration,
  902. skipDuration: 0,
  903. likes: -1,
  904. dislikes: -1,
  905. };
  906. } else {
  907. $set.currentSong = {
  908. songId: song.songId,
  909. title: song.title,
  910. artists: song.artists,
  911. duration: song.duration,
  912. likes: song.likes,
  913. dislikes: song.dislikes,
  914. skipDuration: song.skipDuration,
  915. thumbnail: song.thumbnail,
  916. };
  917. }
  918. if (currentSongIndex >= 0)
  919. $set.currentSongIndex = currentSongIndex;
  920. $set.startedAt = Date.now();
  921. $set.timePaused = 0;
  922. if (station.paused) $set.pausedAt = Date.now();
  923. next(null, $set, station);
  924. },
  925. ($set, station, next) => {
  926. this.stationModel.updateOne(
  927. { _id: station._id },
  928. { $set },
  929. (err) => {
  930. this.runJob(
  931. "UPDATE_STATION",
  932. {
  933. stationId: station._id,
  934. bypassQueue: payload.bypassQueue,
  935. },
  936. { bypassQueue: payload.bypassQueue }
  937. )
  938. .then((station) => {
  939. if (
  940. station.type === "community" &&
  941. station.partyMode === true
  942. )
  943. this.cache
  944. .runJob("PUB", {
  945. channel:
  946. "station.queueUpdate",
  947. value: payload.stationId,
  948. })
  949. .then()
  950. .catch();
  951. next(null, station);
  952. })
  953. .catch(next);
  954. }
  955. );
  956. },
  957. ],
  958. async (err, station) => {
  959. if (err) {
  960. err = await this.utils.runJob("GET_ERROR", {
  961. error: err,
  962. });
  963. this.log(
  964. "ERROR",
  965. `Skipping station "${payload.stationId}" failed. "${err}"`
  966. );
  967. reject(new Error(err));
  968. } else {
  969. if (
  970. station.currentSong !== null &&
  971. station.currentSong.songId !== undefined
  972. ) {
  973. station.currentSong.skipVotes = 0;
  974. }
  975. //TODO Pub/Sub this
  976. this.utils
  977. .runJob("EMIT_TO_ROOM", {
  978. room: `station.${station._id}`,
  979. args: [
  980. "event:songs.next",
  981. {
  982. currentSong: station.currentSong,
  983. startedAt: station.startedAt,
  984. paused: station.paused,
  985. timePaused: 0,
  986. },
  987. ],
  988. })
  989. .then()
  990. .catch();
  991. if (station.privacy === "public") {
  992. this.utils
  993. .runJob("EMIT_TO_ROOM", {
  994. room: "home",
  995. args: [
  996. "event:station.nextSong",
  997. station._id,
  998. station.currentSong,
  999. ],
  1000. })
  1001. .then()
  1002. .catch();
  1003. } else {
  1004. let sockets = await this.utils.runJob(
  1005. "GET_ROOM_SOCKETS",
  1006. { room: "home" }
  1007. );
  1008. for (let socketId in sockets) {
  1009. let socket = sockets[socketId];
  1010. let session = sockets[socketId].session;
  1011. if (session.sessionId) {
  1012. this.cache
  1013. .runJob("HGET", {
  1014. table: "sessions",
  1015. key: session.sessionId,
  1016. })
  1017. .then((session) => {
  1018. if (session) {
  1019. this.db
  1020. .runJob("GET_MODEL", {
  1021. modelName: "user",
  1022. })
  1023. .then((userModel) => {
  1024. userModel.findOne(
  1025. {
  1026. _id:
  1027. session.userId,
  1028. },
  1029. (err, user) => {
  1030. if (
  1031. !err &&
  1032. user
  1033. ) {
  1034. if (
  1035. user.role ===
  1036. "admin"
  1037. )
  1038. socket.emit(
  1039. "event:station.nextSong",
  1040. station._id,
  1041. station.currentSong
  1042. );
  1043. else if (
  1044. station.type ===
  1045. "community" &&
  1046. station.owner ===
  1047. session.userId
  1048. )
  1049. socket.emit(
  1050. "event:station.nextSong",
  1051. station._id,
  1052. station.currentSong
  1053. );
  1054. }
  1055. }
  1056. );
  1057. });
  1058. }
  1059. });
  1060. }
  1061. }
  1062. }
  1063. if (
  1064. station.currentSong !== null &&
  1065. station.currentSong.songId !== undefined
  1066. ) {
  1067. this.utils.runJob("SOCKETS_JOIN_SONG_ROOM", {
  1068. sockets: await this.utils.runJob(
  1069. "GET_ROOM_SOCKETS",
  1070. { room: `station.${station._id}` }
  1071. ),
  1072. room: `song.${station.currentSong.songId}`,
  1073. });
  1074. if (!station.paused) {
  1075. this.notifications.runJob("SCHEDULE", {
  1076. name: `stations.nextSong?id=${station._id}`,
  1077. time: station.currentSong.duration * 1000,
  1078. station,
  1079. });
  1080. }
  1081. } else {
  1082. this.utils
  1083. .runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  1084. sockets: await this.utils.runJob(
  1085. "GET_ROOM_SOCKETS",
  1086. { room: `station.${station._id}` }
  1087. ),
  1088. })
  1089. .then()
  1090. .catch();
  1091. }
  1092. resolve({ station: station });
  1093. }
  1094. }
  1095. );
  1096. });
  1097. }
  1098. CAN_USER_VIEW_STATION(payload) {
  1099. // station, userId, hideUnlisted, cb
  1100. return new Promise((resolve, reject) => {
  1101. async.waterfall(
  1102. [
  1103. (next) => {
  1104. if (payload.station.privacy === "public")
  1105. return next(true);
  1106. if (payload.station.privacy === "unlisted")
  1107. if (payload.hideUnlisted === true)
  1108. return next();
  1109. else
  1110. return next(true);
  1111. if (!payload.userId) return next("Not allowed");
  1112. next();
  1113. },
  1114. (next) => {
  1115. this.db
  1116. .runJob("GET_MODEL", {
  1117. modelName: "user",
  1118. })
  1119. .then((userModel) => {
  1120. userModel.findOne(
  1121. { _id: payload.userId },
  1122. next
  1123. );
  1124. });
  1125. },
  1126. (user, next) => {
  1127. if (!user) return next("Not allowed");
  1128. if (user.role === "admin") return next(true);
  1129. if (payload.station.type === "official")
  1130. return next("Not allowed");
  1131. if (payload.station.owner === payload.userId)
  1132. return next(true);
  1133. next("Not allowed");
  1134. },
  1135. ],
  1136. async (errOrResult) => {
  1137. if (errOrResult !== true && errOrResult !== "Not allowed") {
  1138. errOrResult = await this.utils.runJob("GET_ERROR", {
  1139. error: errOrResult,
  1140. });
  1141. reject(new Error(errOrResult));
  1142. } else {
  1143. resolve(errOrResult === true ? true : false);
  1144. }
  1145. }
  1146. );
  1147. });
  1148. }
  1149. }
  1150. module.exports = new StationsModule();