stations.js 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171
  1. const CoreClass = require("../core.js");
  2. const async = require("async");
  3. let subscription = null;
  4. class StationsModule extends CoreClass {
  5. constructor() {
  6. super("stations");
  7. }
  8. initialize() {
  9. return new Promise(async (resolve, reject) => {
  10. this.cache = this.moduleManager.modules["cache"];
  11. this.db = this.moduleManager.modules["db"];
  12. this.utils = this.moduleManager.modules["utils"];
  13. this.songs = this.moduleManager.modules["songs"];
  14. this.notifications = this.moduleManager.modules["notifications"];
  15. this.defaultSong = {
  16. songId: "60ItHLz5WEA",
  17. title: "Faded - Alan Walker",
  18. duration: 212,
  19. skipDuration: 0,
  20. likes: -1,
  21. dislikes: -1,
  22. };
  23. //TEMP
  24. this.cache.runJob("SUB", {
  25. channel: "station.pause",
  26. cb: async (stationId) => {
  27. this.notifications
  28. .runJob("REMOVE", {
  29. subscription: `stations.nextSong?id=${stationId}`,
  30. })
  31. .then();
  32. },
  33. });
  34. this.cache.runJob("SUB", {
  35. channel: "station.resume",
  36. cb: async (stationId) => {
  37. this.runJob("INITIALIZE_STATION", { stationId }).then();
  38. },
  39. });
  40. this.cache.runJob("SUB", {
  41. channel: "station.queueUpdate",
  42. cb: async (stationId) => {
  43. this.runJob("GET_STATION", { stationId }).then(
  44. (station) => {
  45. if (
  46. !station.currentSong &&
  47. station.queue.length > 0
  48. ) {
  49. this.runJob("INITIALIZE_STATION", {
  50. stationId,
  51. }).then();
  52. }
  53. }
  54. );
  55. },
  56. });
  57. this.cache.runJob("SUB", {
  58. channel: "station.newOfficialPlaylist",
  59. cb: async (stationId) => {
  60. this.cache
  61. .runJob("HGET", {
  62. table: "officialPlaylists",
  63. key: stationId,
  64. })
  65. .then((playlistObj) => {
  66. if (playlistObj) {
  67. this.utils.runJob("EMIT_TO_ROOM", {
  68. room: `station.${stationId}`,
  69. args: [
  70. "event:newOfficialPlaylist",
  71. playlistObj.songs,
  72. ],
  73. });
  74. }
  75. });
  76. },
  77. });
  78. const stationModel = (this.stationModel = await this.db.runJob(
  79. "GET_MODEL",
  80. {
  81. modelName: "station",
  82. }
  83. ));
  84. const stationSchema = (this.stationSchema = await this.cache.runJob(
  85. "GET_SCHEMA",
  86. {
  87. schemaName: "station",
  88. }
  89. ));
  90. async.waterfall(
  91. [
  92. (next) => {
  93. this.setStage(2);
  94. this.cache
  95. .runJob("HGETALL", { table: "stations" })
  96. .then((stations) => next(null, stations))
  97. .catch(next);
  98. },
  99. (stations, next) => {
  100. this.setStage(3);
  101. if (!stations) return next();
  102. let stationIds = Object.keys(stations);
  103. async.each(
  104. stationIds,
  105. (stationId, next) => {
  106. stationModel.findOne(
  107. { _id: stationId },
  108. (err, station) => {
  109. if (err) next(err);
  110. else if (!station) {
  111. this.cache
  112. .runJob("HDEL", {
  113. table: "stations",
  114. key: stationId,
  115. })
  116. .then(() => next())
  117. .catch(next);
  118. } else next();
  119. }
  120. );
  121. },
  122. next
  123. );
  124. },
  125. (next) => {
  126. this.setStage(4);
  127. stationModel.find({}, next);
  128. },
  129. (stations, next) => {
  130. this.setStage(5);
  131. async.each(
  132. stations,
  133. (station, next2) => {
  134. async.waterfall(
  135. [
  136. (next) => {
  137. this.cache
  138. .runJob("HSET", {
  139. table: "stations",
  140. key: station._id,
  141. value: stationSchema(
  142. station
  143. ),
  144. })
  145. .then((station) =>
  146. next(null, station)
  147. )
  148. .catch(next);
  149. },
  150. (station, next) => {
  151. this.runJob(
  152. "INITIALIZE_STATION",
  153. {
  154. stationId: station._id,
  155. bypassQueue: true,
  156. },
  157. { bypassQueue: true }
  158. )
  159. .then(() => next())
  160. .catch(next); // bypassQueue is true because otherwise the module will never initialize
  161. },
  162. ],
  163. (err) => {
  164. next2(err);
  165. }
  166. );
  167. },
  168. next
  169. );
  170. },
  171. ],
  172. async (err) => {
  173. if (err) {
  174. err = await this.utils.runJob("GET_ERROR", {
  175. error: err,
  176. });
  177. reject(new Error(err));
  178. } else {
  179. resolve();
  180. }
  181. }
  182. );
  183. });
  184. }
  185. INITIALIZE_STATION(payload) {
  186. //stationId, cb, bypassValidate = false
  187. return new Promise((resolve, reject) => {
  188. // if (typeof cb !== 'function') cb = ()=>{};
  189. async.waterfall(
  190. [
  191. (next) => {
  192. this.runJob(
  193. "GET_STATION",
  194. {
  195. stationId: payload.stationId,
  196. bypassQueue: payload.bypassQueue,
  197. },
  198. { bypassQueue: payload.bypassQueue }
  199. )
  200. .then((station) => next(null, station))
  201. .catch(next);
  202. },
  203. (station, next) => {
  204. if (!station) return next("Station not found.");
  205. this.notifications
  206. .runJob("UNSCHEDULE", {
  207. subscription: `stations.nextSong?id=${station._id}`,
  208. })
  209. .then()
  210. .catch();
  211. this.notifications
  212. .runJob("SUBSCRIBE", {
  213. subscription: `stations.nextSong?id=${station._id}`,
  214. cb: () =>
  215. this.runJob("SKIP_STATION", {
  216. stationId: station._id,
  217. }),
  218. unique: true,
  219. station,
  220. })
  221. .then()
  222. .catch();
  223. if (station.paused) return next(true, station);
  224. next(null, station);
  225. },
  226. (station, next) => {
  227. if (!station.currentSong) {
  228. return this.runJob(
  229. "SKIP_STATION",
  230. {
  231. stationId: station._id,
  232. bypassQueue: payload.bypassQueue,
  233. },
  234. { bypassQueue: payload.bypassQueue }
  235. )
  236. .then((station) => next(true, station))
  237. .catch(next)
  238. .finally(() => {});
  239. }
  240. let timeLeft =
  241. station.currentSong.duration * 1000 -
  242. (Date.now() -
  243. station.startedAt -
  244. station.timePaused);
  245. if (isNaN(timeLeft)) timeLeft = -1;
  246. if (
  247. station.currentSong.duration * 1000 < timeLeft ||
  248. timeLeft < 0
  249. ) {
  250. this.runJob(
  251. "SKIP_STATION",
  252. {
  253. stationId: station._id,
  254. bypassQueue: payload.bypassQueue,
  255. },
  256. { bypassQueue: payload.bypassQueue }
  257. )
  258. .then((station) => next(null, station))
  259. .catch(next);
  260. } else {
  261. //name, time, cb, station
  262. this.notifications.runJob("SCHEDULE", {
  263. name: `stations.nextSong?id=${station._id}`,
  264. time: timeLeft,
  265. station,
  266. });
  267. next(null, station);
  268. }
  269. },
  270. ],
  271. async (err, station) => {
  272. if (err && err !== true) {
  273. err = await this.utils.runJob("GET_ERROR", {
  274. error: err,
  275. });
  276. reject(new Error(err));
  277. } else resolve(station);
  278. }
  279. );
  280. });
  281. }
  282. CALCULATE_SONG_FOR_STATION(payload) {
  283. //station, cb, bypassValidate = false
  284. return new Promise(async (resolve, reject) => {
  285. const songModel = await this.db.runJob("GET_MODEL", {
  286. modelName: "song",
  287. });
  288. const stationModel = await this.db.runJob("GET_MODEL", {
  289. modelName: "station",
  290. });
  291. let songList = [];
  292. async.waterfall(
  293. [
  294. (next) => {
  295. if (payload.station.genres.length === 0) return next();
  296. let genresDone = [];
  297. payload.station.genres.forEach((genre) => {
  298. songModel.find({ genres: genre }, (err, songs) => {
  299. if (!err) {
  300. songs.forEach((song) => {
  301. if (songList.indexOf(song._id) === -1) {
  302. let found = false;
  303. song.genres.forEach((songGenre) => {
  304. if (
  305. payload.station.blacklistedGenres.indexOf(
  306. songGenre
  307. ) !== -1
  308. )
  309. found = true;
  310. });
  311. if (!found) {
  312. songList.push(song._id);
  313. }
  314. }
  315. });
  316. }
  317. genresDone.push(genre);
  318. if (
  319. genresDone.length ===
  320. payload.station.genres.length
  321. )
  322. next();
  323. });
  324. });
  325. },
  326. (next) => {
  327. let playlist = [];
  328. songList.forEach(function(songId) {
  329. if (payload.station.playlist.indexOf(songId) === -1)
  330. playlist.push(songId);
  331. });
  332. payload.station.playlist.filter((songId) => {
  333. if (songList.indexOf(songId) !== -1)
  334. playlist.push(songId);
  335. });
  336. this.utils
  337. .runJob("SHUFFLE", { array: playlist })
  338. .then((result) => next(null, result.array))
  339. .catch(next);
  340. },
  341. (playlist, next) => {
  342. this.runJob(
  343. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  344. {
  345. stationId: payload.stationId,
  346. songList: playlist,
  347. bypassQueue: payload.bypassQueue,
  348. },
  349. { bypassQueue: payload.bypassQueue }
  350. )
  351. .then(() => next(null, playlist))
  352. .catch(next);
  353. },
  354. (playlist, next) => {
  355. stationModel.updateOne(
  356. { _id: payload.station._id },
  357. { $set: { playlist: playlist } },
  358. { runValidators: true },
  359. (err) => {
  360. this.runJob(
  361. "UPDATE_STATION",
  362. {
  363. stationId: payload.station._id,
  364. bypassQueue: payload.bypassQueue,
  365. },
  366. { bypassQueue: payload.bypassQueue }
  367. )
  368. .then(() => next(null, playlist))
  369. .catch(next);
  370. }
  371. );
  372. },
  373. ],
  374. (err, newPlaylist) => {
  375. if (err) return reject(new Error(err));
  376. resolve(newPlaylist);
  377. }
  378. );
  379. });
  380. }
  381. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  382. GET_STATION(payload) {
  383. //stationId, cb, bypassValidate = false
  384. return new Promise((resolve, reject) => {
  385. async.waterfall(
  386. [
  387. (next) => {
  388. this.cache
  389. .runJob("HGET", {
  390. table: "stations",
  391. key: payload.stationId,
  392. })
  393. .then((station) => next(null, station))
  394. .catch(next);
  395. },
  396. (station, next) => {
  397. if (station) return next(true, station);
  398. this.stationModel.findOne(
  399. { _id: payload.stationId },
  400. next
  401. );
  402. },
  403. (station, next) => {
  404. if (station) {
  405. if (station.type === "official") {
  406. this.runJob(
  407. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  408. {
  409. stationId: station._id,
  410. songList: station.playlist,
  411. }
  412. )
  413. .then()
  414. .catch();
  415. }
  416. station = this.stationSchema(station);
  417. this.cache
  418. .runJob("HSET", {
  419. table: "stations",
  420. key: payload.stationId,
  421. value: station,
  422. })
  423. .then()
  424. .catch();
  425. next(true, station);
  426. } else next("Station not found");
  427. },
  428. ],
  429. async (err, station) => {
  430. if (err && err !== true) {
  431. err = await this.utils.runJob("GET_ERROR", {
  432. error: err,
  433. });
  434. reject(new Error(err));
  435. } else resolve(station);
  436. }
  437. );
  438. });
  439. }
  440. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  441. GET_STATION_BY_NAME(payload) {
  442. //stationName, cb
  443. return new Promise(async (resolve, reject) => {
  444. const stationModel = await this.db.runJob("GET_MODEL", {
  445. modelName: "station",
  446. });
  447. async.waterfall(
  448. [
  449. (next) => {
  450. stationModel.findOne(
  451. { name: payload.stationName },
  452. next
  453. );
  454. },
  455. (station, next) => {
  456. if (station) {
  457. if (station.type === "official") {
  458. this.runJob(
  459. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  460. {
  461. stationId: station._id,
  462. songList: station.playlist,
  463. }
  464. );
  465. }
  466. this.cache
  467. .runJob("GET_SCHEMA", { schemaName: "station" })
  468. .then((stationSchema) => {
  469. station = stationSchema(station);
  470. this.cache.runJob("HSET", {
  471. table: "stations",
  472. key: station._id,
  473. value: station,
  474. });
  475. next(true, station);
  476. });
  477. } else next("Station not found");
  478. },
  479. ],
  480. (err, station) => {
  481. if (err && err !== true) return reject(new Error(err));
  482. resolve(station);
  483. }
  484. );
  485. });
  486. }
  487. UPDATE_STATION(payload) {
  488. //stationId, cb, bypassValidate = false
  489. return new Promise((resolve, reject) => {
  490. async.waterfall(
  491. [
  492. (next) => {
  493. this.stationModel.findOne(
  494. { _id: payload.stationId },
  495. next
  496. );
  497. },
  498. (station, next) => {
  499. if (!station) {
  500. this.cache
  501. .runJob("HDEL", {
  502. table: "stations",
  503. key: payload.stationId,
  504. })
  505. .then()
  506. .catch();
  507. return next("Station not found");
  508. }
  509. this.cache
  510. .runJob("HSET", {
  511. table: "stations",
  512. key: payload.stationId,
  513. value: station,
  514. })
  515. .then((station) => next(null, station))
  516. .catch(next);
  517. },
  518. ],
  519. async (err, station) => {
  520. if (err && err !== true) {
  521. err = await this.utils.runJob("GET_ERROR", {
  522. error: err,
  523. });
  524. reject(new Error(err));
  525. } else resolve(station);
  526. }
  527. );
  528. });
  529. }
  530. CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  531. //stationId, songList, cb, bypassValidate = false
  532. return new Promise(async (resolve, reject) => {
  533. const officialPlaylistSchema = await this.cache.runJob(
  534. "GET_SCHEMA",
  535. {
  536. schemaName: "officialPlaylist",
  537. }
  538. );
  539. let lessInfoPlaylist = [];
  540. async.each(
  541. payload.songList,
  542. (song, next) => {
  543. this.songs
  544. .runJob("GET_SONG", { id: song })
  545. .then((response) => {
  546. const song = response.song;
  547. if (song) {
  548. let newSong = {
  549. songId: song.songId,
  550. title: song.title,
  551. artists: song.artists,
  552. duration: song.duration,
  553. };
  554. lessInfoPlaylist.push(newSong);
  555. }
  556. })
  557. .finally(() => {
  558. next();
  559. });
  560. },
  561. () => {
  562. this.cache
  563. .runJob("HSET", {
  564. table: "officialPlaylists",
  565. key: payload.stationId,
  566. value: officialPlaylistSchema(
  567. payload.stationId,
  568. lessInfoPlaylist
  569. ),
  570. })
  571. .finally(() => {
  572. this.cache.runJob("PUB", {
  573. channel: "station.newOfficialPlaylist",
  574. value: payload.stationId,
  575. });
  576. resolve();
  577. });
  578. }
  579. );
  580. });
  581. }
  582. SKIP_STATION(payload) {
  583. //stationId
  584. return new Promise((resolve, reject) => {
  585. this.log("INFO", `Skipping station ${payload.stationId}.`);
  586. this.log(
  587. "STATION_ISSUE",
  588. `SKIP_STATION_CB - Station ID: ${payload.stationId}.`
  589. );
  590. async.waterfall(
  591. [
  592. (next) => {
  593. this.runJob(
  594. "GET_STATION",
  595. {
  596. stationId: payload.stationId,
  597. bypassQueue: payload.bypassQueue,
  598. },
  599. { bypassQueue: payload.bypassQueue }
  600. )
  601. .then((station) => {
  602. next(null, station);
  603. })
  604. .catch(() => {});
  605. },
  606. (station, next) => {
  607. if (!station) return next("Station not found.");
  608. if (
  609. station.type === "community" &&
  610. station.partyMode &&
  611. station.queue.length === 0
  612. )
  613. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  614. if (
  615. station.type === "community" &&
  616. station.partyMode &&
  617. station.queue.length > 0
  618. ) {
  619. // Community station with party mode enabled and songs in the queue
  620. if (station.paused) {
  621. return next(null, null, -19, station);
  622. } else {
  623. return this.stationModel.updateOne(
  624. { _id: payload.stationId },
  625. {
  626. $pull: {
  627. queue: {
  628. _id: station.queue[0]._id,
  629. },
  630. },
  631. },
  632. (err) => {
  633. if (err) return next(err);
  634. next(
  635. null,
  636. station.queue[0],
  637. -12,
  638. station
  639. );
  640. }
  641. );
  642. }
  643. }
  644. if (
  645. station.type === "community" &&
  646. !station.partyMode
  647. ) {
  648. this.db
  649. .runJob("GET_MODEL", { modelName: "playlist" })
  650. .then((playlistModel) => {
  651. return playlistModel.findOne(
  652. { _id: station.privatePlaylist },
  653. (err, playlist) => {
  654. if (err) return next(err);
  655. if (!playlist)
  656. return next(
  657. null,
  658. null,
  659. -13,
  660. station
  661. );
  662. playlist = playlist.songs;
  663. if (playlist.length > 0) {
  664. let currentSongIndex;
  665. if (
  666. station.currentSongIndex <
  667. playlist.length - 1
  668. )
  669. currentSongIndex =
  670. station.currentSongIndex +
  671. 1;
  672. else currentSongIndex = 0;
  673. let callback = (err, song) => {
  674. if (err) return next(err);
  675. if (song)
  676. return next(
  677. null,
  678. song,
  679. currentSongIndex,
  680. station
  681. );
  682. else {
  683. let song =
  684. playlist[
  685. currentSongIndex
  686. ];
  687. let currentSong = {
  688. songId: song.songId,
  689. title: song.title,
  690. duration:
  691. song.duration,
  692. likes: -1,
  693. dislikes: -1,
  694. };
  695. return next(
  696. null,
  697. currentSong,
  698. currentSongIndex,
  699. station
  700. );
  701. }
  702. };
  703. if (
  704. playlist[currentSongIndex]
  705. ._id
  706. )
  707. this.songs
  708. .runJob("GET_SONG", {
  709. id:
  710. playlist[
  711. currentSongIndex
  712. ]._id,
  713. })
  714. .then((response) =>
  715. callback(
  716. null,
  717. response.song
  718. )
  719. )
  720. .catch(callback);
  721. else
  722. this.songs
  723. .runJob(
  724. "GET_SONG_FROM_ID",
  725. {
  726. songId:
  727. playlist[
  728. currentSongIndex
  729. ].songId,
  730. }
  731. )
  732. .then((response) =>
  733. callback(
  734. null,
  735. response.song
  736. )
  737. )
  738. .catch(callback);
  739. } else
  740. return next(
  741. null,
  742. null,
  743. -14,
  744. station
  745. );
  746. }
  747. );
  748. });
  749. }
  750. if (
  751. station.type === "official" &&
  752. station.playlist.length === 0
  753. ) {
  754. return this.runJob(
  755. "CALCULATE_SONG_FOR_STATION",
  756. { station, bypassQueue: payload.bypassQueue },
  757. { bypassQueue: payload.bypassQueue }
  758. )
  759. .then((playlist) => {
  760. if (playlist.length === 0)
  761. return next(
  762. null,
  763. this.defaultSong,
  764. 0,
  765. station
  766. );
  767. else {
  768. this.songs
  769. .runJob("GET_SONG", {
  770. id: playlist[0],
  771. })
  772. .then((response) => {
  773. next(
  774. null,
  775. response.song,
  776. 0,
  777. station
  778. );
  779. })
  780. .catch((err) => {
  781. return next(
  782. null,
  783. this.defaultSong,
  784. 0,
  785. station
  786. );
  787. });
  788. }
  789. })
  790. .catch(next);
  791. }
  792. if (
  793. station.type === "official" &&
  794. station.playlist.length > 0
  795. ) {
  796. async.doUntil(
  797. (next) => {
  798. if (
  799. station.currentSongIndex <
  800. station.playlist.length - 1
  801. ) {
  802. this.songs
  803. .runJob("GET_SONG", {
  804. id:
  805. station.playlist[
  806. station.currentSongIndex +
  807. 1
  808. ],
  809. })
  810. .then((response) => {
  811. return next(
  812. null,
  813. response.song,
  814. station.currentSongIndex + 1
  815. );
  816. })
  817. .catch((err) => {
  818. station.currentSongIndex++;
  819. next(null, null, null);
  820. });
  821. } else {
  822. this.runJob(
  823. "CALCULATE_SONG_FOR_STATION",
  824. {
  825. station,
  826. bypassQueue:
  827. payload.bypassQueue,
  828. },
  829. { bypassQueue: payload.bypassQueue }
  830. )
  831. .then((newPlaylist) => {
  832. this.songs.getSong(
  833. newPlaylist[0],
  834. (err, song) => {
  835. if (err || !song)
  836. return next(
  837. null,
  838. this
  839. .defaultSong,
  840. 0
  841. );
  842. station.playlist = newPlaylist;
  843. next(null, song, 0);
  844. }
  845. );
  846. })
  847. .catch((err) => {
  848. next(null, this.defaultSong, 0);
  849. });
  850. }
  851. },
  852. (song, currentSongIndex, next) => {
  853. if (!!song)
  854. return next(
  855. null,
  856. true,
  857. currentSongIndex
  858. );
  859. else return next(null, false);
  860. },
  861. (err, song, currentSongIndex) => {
  862. return next(
  863. err,
  864. song,
  865. currentSongIndex,
  866. station
  867. );
  868. }
  869. );
  870. }
  871. },
  872. (song, currentSongIndex, station, next) => {
  873. let $set = {};
  874. if (song === null) $set.currentSong = null;
  875. else if (song.likes === -1 && song.dislikes === -1) {
  876. $set.currentSong = {
  877. songId: song.songId,
  878. title: song.title,
  879. duration: song.duration,
  880. skipDuration: 0,
  881. likes: -1,
  882. dislikes: -1,
  883. };
  884. } else {
  885. $set.currentSong = {
  886. songId: song.songId,
  887. title: song.title,
  888. artists: song.artists,
  889. duration: song.duration,
  890. likes: song.likes,
  891. dislikes: song.dislikes,
  892. skipDuration: song.skipDuration,
  893. thumbnail: song.thumbnail,
  894. };
  895. }
  896. if (currentSongIndex >= 0)
  897. $set.currentSongIndex = currentSongIndex;
  898. $set.startedAt = Date.now();
  899. $set.timePaused = 0;
  900. if (station.paused) $set.pausedAt = Date.now();
  901. next(null, $set, station);
  902. },
  903. ($set, station, next) => {
  904. this.stationModel.updateOne(
  905. { _id: station._id },
  906. { $set },
  907. (err) => {
  908. this.runJob(
  909. "UPDATE_STATION",
  910. {
  911. stationId: station._id,
  912. bypassQueue: payload.bypassQueue,
  913. },
  914. { bypassQueue: payload.bypassQueue }
  915. )
  916. .then((station) => {
  917. if (
  918. station.type === "community" &&
  919. station.partyMode === true
  920. )
  921. this.cache
  922. .runJob("PUB", {
  923. channel:
  924. "station.queueUpdate",
  925. value: payload.stationId,
  926. })
  927. .then()
  928. .catch();
  929. next(null, station);
  930. })
  931. .catch(next);
  932. }
  933. );
  934. },
  935. ],
  936. async (err, station) => {
  937. if (err) {
  938. err = await this.utils.runJob("GET_ERROR", {
  939. error: err,
  940. });
  941. this.log(
  942. "ERROR",
  943. `Skipping station "${payload.stationId}" failed. "${err}"`
  944. );
  945. reject(new Error(err));
  946. } else {
  947. if (
  948. station.currentSong !== null &&
  949. station.currentSong.songId !== undefined
  950. ) {
  951. station.currentSong.skipVotes = 0;
  952. }
  953. //TODO Pub/Sub this
  954. this.utils
  955. .runJob("EMIT_TO_ROOM", {
  956. room: `station.${station._id}`,
  957. args: [
  958. "event:songs.next",
  959. {
  960. currentSong: station.currentSong,
  961. startedAt: station.startedAt,
  962. paused: station.paused,
  963. timePaused: 0,
  964. },
  965. ],
  966. })
  967. .then()
  968. .catch();
  969. if (station.privacy === "public") {
  970. this.utils
  971. .runJob("EMIT_TO_ROOM", {
  972. room: "home",
  973. args: [
  974. "event:station.nextSong",
  975. station._id,
  976. station.currentSong,
  977. ],
  978. })
  979. .then()
  980. .catch();
  981. } else {
  982. let sockets = await this.utils.runJob(
  983. "GET_ROOM_SOCKETS",
  984. { room: "home" }
  985. );
  986. for (let socketId in sockets) {
  987. let socket = sockets[socketId];
  988. let session = sockets[socketId].session;
  989. if (session.sessionId) {
  990. this.cache
  991. .runJob("HGET", {
  992. table: "sessions",
  993. key: session.sessionId,
  994. })
  995. .then((session) => {
  996. if (session) {
  997. this.db
  998. .runJob("GET_MODEL", {
  999. modelName: "user",
  1000. })
  1001. .then((userModel) => {
  1002. userModel.findOne(
  1003. {
  1004. _id:
  1005. session.userId,
  1006. },
  1007. (err, user) => {
  1008. if (
  1009. !err &&
  1010. user
  1011. ) {
  1012. if (
  1013. user.role ===
  1014. "admin"
  1015. )
  1016. socket.emit(
  1017. "event:station.nextSong",
  1018. station._id,
  1019. station.currentSong
  1020. );
  1021. else if (
  1022. station.type ===
  1023. "community" &&
  1024. station.owner ===
  1025. session.userId
  1026. )
  1027. socket.emit(
  1028. "event:station.nextSong",
  1029. station._id,
  1030. station.currentSong
  1031. );
  1032. }
  1033. }
  1034. );
  1035. });
  1036. }
  1037. });
  1038. }
  1039. }
  1040. }
  1041. if (
  1042. station.currentSong !== null &&
  1043. station.currentSong.songId !== undefined
  1044. ) {
  1045. this.utils.runJob("SOCKETS_JOIN_SONG_ROOM", {
  1046. sockets: await this.utils.runJob(
  1047. "GET_ROOM_SOCKETS",
  1048. { room: `station.${station._id}` }
  1049. ),
  1050. room: `song.${station.currentSong.songId}`,
  1051. });
  1052. if (!station.paused) {
  1053. this.notifications.runJob("SCHEDULE", {
  1054. name: `stations.nextSong?id=${station._id}`,
  1055. time: station.currentSong.duration * 1000,
  1056. station,
  1057. });
  1058. }
  1059. } else {
  1060. this.utils
  1061. .runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  1062. sockets: await this.utils.runJob(
  1063. "GET_ROOM_SOCKETS",
  1064. { room: `station.${station._id}` }
  1065. ),
  1066. })
  1067. .then()
  1068. .catch();
  1069. }
  1070. resolve({ station: station });
  1071. }
  1072. }
  1073. );
  1074. });
  1075. }
  1076. CAN_USER_VIEW_STATION(payload) {
  1077. // station, userId, cb
  1078. return new Promise((resolve, reject) => {
  1079. async.waterfall(
  1080. [
  1081. (next) => {
  1082. if (payload.station.privacy !== "private")
  1083. return next(true);
  1084. if (!payload.userId) return next("Not allowed");
  1085. next();
  1086. },
  1087. (next) => {
  1088. this.db
  1089. .runJob("GET_MODEL", {
  1090. modelName: "user",
  1091. })
  1092. .then((userModel) => {
  1093. userModel.findOne(
  1094. { _id: payload.userId },
  1095. next
  1096. );
  1097. });
  1098. },
  1099. (user, next) => {
  1100. if (!user) return next("Not allowed");
  1101. if (user.role === "admin") return next(true);
  1102. if (payload.station.type === "official")
  1103. return next("Not allowed");
  1104. if (payload.station.owner === payload.userId)
  1105. return next(true);
  1106. next("Not allowed");
  1107. },
  1108. ],
  1109. async (errOrResult) => {
  1110. if (errOrResult !== true && errOrResult !== "Not allowed") {
  1111. errOrResult = await this.utils.runJob("GET_ERROR", {
  1112. error: errOrResult,
  1113. });
  1114. reject(new Error(errOrResult));
  1115. } else {
  1116. resolve(errOrResult === true ? true : false);
  1117. }
  1118. }
  1119. );
  1120. });
  1121. }
  1122. }
  1123. module.exports = new StationsModule();