stations.js 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153
  1. const CoreClass = require("../core.js");
  2. const async = require("async");
  3. let subscription = null;
  4. class StationsModule extends CoreClass {
  5. constructor() {
  6. super("stations");
  7. }
  8. initialize() {
  9. return new Promise(async (resolve, reject) => {
  10. this.cache = this.moduleManager.modules["cache"];
  11. this.db = this.moduleManager.modules["db"];
  12. this.utils = this.moduleManager.modules["utils"];
  13. this.songs = this.moduleManager.modules["songs"];
  14. this.notifications = this.moduleManager.modules["notifications"];
  15. this.defaultSong = {
  16. songId: "60ItHLz5WEA",
  17. title: "Faded - Alan Walker",
  18. duration: 212,
  19. skipDuration: 0,
  20. likes: -1,
  21. dislikes: -1,
  22. };
  23. //TEMP
  24. this.cache.runJob("SUB", {
  25. channel: "station.pause",
  26. cb: async (stationId) => {
  27. this.notifications
  28. .runJob("REMOVE", {
  29. subscription: `stations.nextSong?id=${stationId}`,
  30. })
  31. .then();
  32. },
  33. });
  34. this.cache.runJob("SUB", {
  35. channel: "station.resume",
  36. cb: async (stationId) => {
  37. this.runJob("INITIALIZE_STATION", { stationId }).then();
  38. },
  39. });
  40. this.cache.runJob("SUB", {
  41. channel: "station.queueUpdate",
  42. cb: async (stationId) => {
  43. this.runJob("GET_STATION", { stationId }).then(
  44. (station) => {
  45. if (
  46. !station.currentSong &&
  47. station.queue.length > 0
  48. ) {
  49. this.runJob("INITIALIZE_STATION", {
  50. stationId,
  51. }).then();
  52. }
  53. }
  54. );
  55. },
  56. });
  57. this.cache.runJob("SUB", {
  58. channel: "station.newOfficialPlaylist",
  59. cb: async (stationId) => {
  60. this.cache
  61. .runJob("HGET", {
  62. table: "officialPlaylists",
  63. key: stationId,
  64. })
  65. .then((playlistObj) => {
  66. if (playlistObj) {
  67. this.utils.runJob("EMIT_TO_ROOM", {
  68. room: `station.${stationId}`,
  69. args: [
  70. "event:newOfficialPlaylist",
  71. playlistObj.songs,
  72. ],
  73. });
  74. }
  75. });
  76. },
  77. });
  78. const stationModel = (this.stationModel = await this.db.runJob(
  79. "GET_MODEL",
  80. {
  81. modelName: "station",
  82. }
  83. ));
  84. const stationSchema = (this.stationSchema = await this.cache.runJob(
  85. "GET_SCHEMA",
  86. {
  87. schemaName: "station",
  88. }
  89. ));
  90. async.waterfall(
  91. [
  92. (next) => {
  93. this.setStage(2);
  94. this.cache
  95. .runJob("HGETALL", { table: "stations" })
  96. .then((stations) => next(null, stations))
  97. .catch(next);
  98. },
  99. (stations, next) => {
  100. this.setStage(3);
  101. if (!stations) return next();
  102. let stationIds = Object.keys(stations);
  103. async.each(
  104. stationIds,
  105. (stationId, next) => {
  106. stationModel.findOne(
  107. { _id: stationId },
  108. (err, station) => {
  109. if (err) next(err);
  110. else if (!station) {
  111. this.cache
  112. .runJob("HDEL", {
  113. table: "stations",
  114. key: stationId,
  115. })
  116. .then(() => next())
  117. .catch(next);
  118. } else next();
  119. }
  120. );
  121. },
  122. next
  123. );
  124. },
  125. (next) => {
  126. this.setStage(4);
  127. stationModel.find({}, next);
  128. },
  129. (stations, next) => {
  130. this.setStage(5);
  131. async.each(
  132. stations,
  133. (station, next2) => {
  134. async.waterfall(
  135. [
  136. (next) => {
  137. this.cache
  138. .runJob("HSET", {
  139. table: "stations",
  140. key: station._id,
  141. value: stationSchema(
  142. station
  143. ),
  144. })
  145. .then((station) =>
  146. next(null, station)
  147. )
  148. .catch(next);
  149. },
  150. (station, next) => {
  151. this.runJob(
  152. "INITIALIZE_STATION",
  153. {
  154. stationId: station._id,
  155. bypassQueue: true,
  156. },
  157. { bypassQueue: true }
  158. )
  159. .then(() => next())
  160. .catch(next); // bypassQueue is true because otherwise the module will never initialize
  161. },
  162. ],
  163. (err) => {
  164. next2(err);
  165. }
  166. );
  167. },
  168. next
  169. );
  170. },
  171. ],
  172. async (err) => {
  173. if (err) {
  174. err = await this.utils.runJob("GET_ERROR", {
  175. error: err,
  176. });
  177. reject(new Error(err));
  178. } else {
  179. resolve();
  180. }
  181. }
  182. );
  183. });
  184. }
  185. INITIALIZE_STATION(payload) {
  186. //stationId, cb, bypassValidate = false
  187. return new Promise((resolve, reject) => {
  188. // if (typeof cb !== 'function') cb = ()=>{};
  189. async.waterfall(
  190. [
  191. (next) => {
  192. this.runJob(
  193. "GET_STATION",
  194. { stationId: payload.stationId },
  195. { bypassQueue: payload.bypassQueue }
  196. )
  197. .then((station) => next(null, station))
  198. .catch(next);
  199. },
  200. (station, next) => {
  201. if (!station) return next("Station not found.");
  202. this.notifications
  203. .runJob("UNSCHEDULE", {
  204. subscription: `stations.nextSong?id=${station._id}`,
  205. })
  206. .then()
  207. .catch();
  208. this.notifications
  209. .runJob("SUBSCRIBE", {
  210. subscription: `stations.nextSong?id=${station._id}`,
  211. cb: () =>
  212. this.runJob("SKIP_STATION", {
  213. stationId: station._id,
  214. }),
  215. unique: true,
  216. station,
  217. })
  218. .then()
  219. .catch();
  220. if (station.paused) return next(true, station);
  221. next(null, station);
  222. },
  223. (station, next) => {
  224. if (!station.currentSong) {
  225. return this.runJob(
  226. "SKIP_STATION",
  227. {
  228. stationId: station._id,
  229. bypassQueue: payload.bypassQueue,
  230. },
  231. { bypassQueue: payload.bypassQueue }
  232. )
  233. .then((station) => next(true, station))
  234. .catch(next)
  235. .finally(() => {});
  236. }
  237. let timeLeft =
  238. station.currentSong.duration * 1000 -
  239. (Date.now() -
  240. station.startedAt -
  241. station.timePaused);
  242. if (isNaN(timeLeft)) timeLeft = -1;
  243. if (
  244. station.currentSong.duration * 1000 < timeLeft ||
  245. timeLeft < 0
  246. ) {
  247. this.runJob(
  248. "SKIP_STATION",
  249. { stationId: station._id },
  250. { bypassQueue: payload.bypassQueue }
  251. )
  252. .then((station) => next(null, station))
  253. .catch(next);
  254. } else {
  255. //name, time, cb, station
  256. this.notifications.runJob("SCHEDULE", {
  257. name: `stations.nextSong?id=${station._id}`,
  258. time: timeLeft,
  259. station,
  260. });
  261. next(null, station);
  262. }
  263. },
  264. ],
  265. async (err, station) => {
  266. if (err && err !== true) {
  267. err = await this.utils.runJob("GET_ERROR", {
  268. error: err,
  269. });
  270. reject(new Error(err));
  271. } else resolve(station);
  272. }
  273. );
  274. });
  275. }
  276. CALCULATE_SONG_FOR_STATION(payload) {
  277. //station, cb, bypassValidate = false
  278. return new Promise(async (resolve, reject) => {
  279. const songModel = await this.db.runJob("GET_MODEL", {
  280. modelName: "song",
  281. });
  282. const stationModel = await this.db.runJob("GET_MODEL", {
  283. modelName: "station",
  284. });
  285. let songList = [];
  286. async.waterfall(
  287. [
  288. (next) => {
  289. if (payload.station.genres.length === 0) return next();
  290. let genresDone = [];
  291. payload.station.genres.forEach((genre) => {
  292. songModel.find({ genres: genre }, (err, songs) => {
  293. if (!err) {
  294. songs.forEach((song) => {
  295. if (songList.indexOf(song._id) === -1) {
  296. let found = false;
  297. song.genres.forEach((songGenre) => {
  298. if (
  299. payload.station.blacklistedGenres.indexOf(
  300. songGenre
  301. ) !== -1
  302. )
  303. found = true;
  304. });
  305. if (!found) {
  306. songList.push(song._id);
  307. }
  308. }
  309. });
  310. }
  311. genresDone.push(genre);
  312. if (
  313. genresDone.length ===
  314. payload.station.genres.length
  315. )
  316. next();
  317. });
  318. });
  319. },
  320. (next) => {
  321. let playlist = [];
  322. songList.forEach(function(songId) {
  323. if (payload.station.playlist.indexOf(songId) === -1)
  324. playlist.push(songId);
  325. });
  326. payload.station.playlist.filter((songId) => {
  327. if (songList.indexOf(songId) !== -1)
  328. playlist.push(songId);
  329. });
  330. this.utils
  331. .runJob("SHUFFLE", { array: playlist })
  332. .then((playlist) => next(null, playlist))
  333. .catch(next);
  334. },
  335. (playlist, next) => {
  336. this.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  337. stationId,
  338. songList: playlist,
  339. })
  340. .then(() => next(null, playlist))
  341. .catch(next);
  342. },
  343. (playlist, next) => {
  344. stationModel.updateOne(
  345. { _id: station._id },
  346. { $set: { playlist: playlist } },
  347. { runValidators: true },
  348. (err) => {
  349. this.runJob("UPDATE_STATION", {
  350. stationId: station._id,
  351. })
  352. .then(() => next(null, playlist))
  353. .catch(next);
  354. }
  355. );
  356. },
  357. ],
  358. (err, newPlaylist) => {
  359. if (err) return reject(new Error(err));
  360. resolve(newPlaylist);
  361. }
  362. );
  363. });
  364. }
  365. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  366. GET_STATION(payload) {
  367. //stationId, cb, bypassValidate = false
  368. return new Promise((resolve, reject) => {
  369. async.waterfall(
  370. [
  371. (next) => {
  372. this.cache
  373. .runJob("HGET", {
  374. table: "stations",
  375. key: payload.stationId,
  376. })
  377. .then((station) => next(null, station))
  378. .catch(next);
  379. },
  380. (station, next) => {
  381. if (station) return next(true, station);
  382. this.stationModel.findOne(
  383. { _id: payload.stationId },
  384. next
  385. );
  386. },
  387. (station, next) => {
  388. if (station) {
  389. if (station.type === "official") {
  390. this.runJob(
  391. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  392. {
  393. stationId: station._id,
  394. songList: station.playlist,
  395. }
  396. )
  397. .then()
  398. .catch();
  399. }
  400. station = this.stationSchema(station);
  401. this.cache
  402. .runJob("HSET", {
  403. table: "stations",
  404. key: payload.stationId,
  405. value: station,
  406. })
  407. .then()
  408. .catch();
  409. next(true, station);
  410. } else next("Station not found");
  411. },
  412. ],
  413. async (err, station) => {
  414. if (err && err !== true) {
  415. err = await this.utils.runJob("GET_ERROR", {
  416. error: err,
  417. });
  418. reject(new Error(err));
  419. } else resolve(station);
  420. }
  421. );
  422. });
  423. }
  424. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  425. GET_STATION_BY_NAME(payload) {
  426. //stationName, cb
  427. return new Promise(async (resolve, reject) => {
  428. const stationModel = await this.db.runJob("GET_MODEL", {
  429. modelName: "station",
  430. });
  431. async.waterfall(
  432. [
  433. (next) => {
  434. stationModel.findOne(
  435. { name: payload.stationName },
  436. next
  437. );
  438. },
  439. (station, next) => {
  440. if (station) {
  441. if (station.type === "official") {
  442. this.runJob(
  443. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  444. {
  445. stationId: station._id,
  446. songList: station.playlist,
  447. }
  448. );
  449. }
  450. this.cache
  451. .runJob("GET_SCHEMA", { schemaName: "station" })
  452. .then((stationSchema) => {
  453. station = stationSchema(station);
  454. this.cache.runJob("HSET", {
  455. table: "stations",
  456. key: station._id,
  457. value: station,
  458. });
  459. next(true, station);
  460. });
  461. } else next("Station not found");
  462. },
  463. ],
  464. (err, station) => {
  465. if (err && err !== true) return reject(new Error(err));
  466. resolve(station);
  467. }
  468. );
  469. });
  470. }
  471. UPDATE_STATION(payload) {
  472. //stationId, cb, bypassValidate = false
  473. return new Promise((resolve, reject) => {
  474. async.waterfall(
  475. [
  476. (next) => {
  477. this.stationModel.findOne(
  478. { _id: payload.stationId },
  479. next
  480. );
  481. },
  482. (station, next) => {
  483. if (!station) {
  484. this.cache
  485. .runJob("HDEL", {
  486. table: "stations",
  487. key: payload.stationId,
  488. })
  489. .then()
  490. .catch();
  491. return next("Station not found");
  492. }
  493. this.cache
  494. .runJob("HSET", {
  495. table: "stations",
  496. key: payload.stationId,
  497. value: station,
  498. })
  499. .then((station) => next(null, station))
  500. .catch(next);
  501. },
  502. ],
  503. async (err, station) => {
  504. if (err && err !== true) {
  505. err = await this.utils.runJob("GET_ERROR", {
  506. error: err,
  507. });
  508. reject(new Error(err));
  509. } else resolve(station);
  510. }
  511. );
  512. });
  513. }
  514. CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  515. //stationId, songList, cb, bypassValidate = false
  516. return new Promise(async (resolve, reject) => {
  517. const officialPlaylistSchema = await this.cache.runJob(
  518. "GET_SCHEMA",
  519. {
  520. schemaName: "officialPlaylist",
  521. }
  522. );
  523. let lessInfoPlaylist = [];
  524. async.each(
  525. payload.songList,
  526. (song, next) => {
  527. this.songs
  528. .runJob("GET_SONG", { id: song })
  529. .then((response) => {
  530. const song = response.song;
  531. if (song) {
  532. let newSong = {
  533. songId: song.songId,
  534. title: song.title,
  535. artists: song.artists,
  536. duration: song.duration,
  537. };
  538. lessInfoPlaylist.push(newSong);
  539. }
  540. })
  541. .finally(() => {
  542. next();
  543. });
  544. },
  545. () => {
  546. this.cache
  547. .runJob("HSET", {
  548. table: "officialPlaylists",
  549. key: payload.stationId,
  550. value: officialPlaylistSchema(
  551. payload.stationId,
  552. lessInfoPlaylist
  553. ),
  554. })
  555. .finally(() => {
  556. this.cache.runJob("PUB", {
  557. channel: "station.newOfficialPlaylist",
  558. value: payload.stationId,
  559. });
  560. resolve();
  561. });
  562. }
  563. );
  564. });
  565. }
  566. SKIP_STATION(payload) {
  567. //stationId
  568. return new Promise((resolve, reject) => {
  569. this.log("INFO", `Skipping station ${payload.stationId}.`);
  570. this.log(
  571. "STATION_ISSUE",
  572. `SKIP_STATION_CB - Station ID: ${payload.stationId}.`
  573. );
  574. async.waterfall(
  575. [
  576. (next) => {
  577. this.runJob(
  578. "GET_STATION",
  579. {
  580. stationId: payload.stationId,
  581. },
  582. { bypassQueue: payload.bypassQueue }
  583. )
  584. .then((station) => {
  585. next(null, station);
  586. })
  587. .catch(() => {});
  588. },
  589. (station, next) => {
  590. if (!station) return next("Station not found.");
  591. if (
  592. station.type === "community" &&
  593. station.partyMode &&
  594. station.queue.length === 0
  595. )
  596. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  597. if (
  598. station.type === "community" &&
  599. station.partyMode &&
  600. station.queue.length > 0
  601. ) {
  602. // Community station with party mode enabled and songs in the queue
  603. if (station.paused) {
  604. return next(null, null, -19, station);
  605. } else {
  606. return this.stationModel.updateOne(
  607. { _id: payload.stationId },
  608. {
  609. $pull: {
  610. queue: {
  611. _id: station.queue[0]._id,
  612. },
  613. },
  614. },
  615. (err) => {
  616. if (err) return next(err);
  617. next(
  618. null,
  619. station.queue[0],
  620. -12,
  621. station
  622. );
  623. }
  624. );
  625. }
  626. }
  627. if (
  628. station.type === "community" &&
  629. !station.partyMode
  630. ) {
  631. this.db
  632. .runJob("GET_MODEL", { modelName: "playlist" })
  633. .then((playlistModel) => {
  634. return playlistModel.findOne(
  635. { _id: station.privatePlaylist },
  636. (err, playlist) => {
  637. if (err) return next(err);
  638. if (!playlist)
  639. return next(
  640. null,
  641. null,
  642. -13,
  643. station
  644. );
  645. playlist = playlist.songs;
  646. if (playlist.length > 0) {
  647. let currentSongIndex;
  648. if (
  649. station.currentSongIndex <
  650. playlist.length - 1
  651. )
  652. currentSongIndex =
  653. station.currentSongIndex +
  654. 1;
  655. else currentSongIndex = 0;
  656. let callback = (err, song) => {
  657. if (err) return next(err);
  658. if (song)
  659. return next(
  660. null,
  661. song,
  662. currentSongIndex,
  663. station
  664. );
  665. else {
  666. let song =
  667. playlist[
  668. currentSongIndex
  669. ];
  670. let currentSong = {
  671. songId: song.songId,
  672. title: song.title,
  673. duration:
  674. song.duration,
  675. likes: -1,
  676. dislikes: -1,
  677. };
  678. return next(
  679. null,
  680. currentSong,
  681. currentSongIndex,
  682. station
  683. );
  684. }
  685. };
  686. if (
  687. playlist[currentSongIndex]
  688. ._id
  689. )
  690. this.songs
  691. .runJob("GET_SONG", {
  692. id:
  693. playlist[
  694. currentSongIndex
  695. ]._id,
  696. })
  697. .then((response) =>
  698. callback(
  699. null,
  700. response.song
  701. )
  702. )
  703. .catch(callback);
  704. else
  705. this.songs
  706. .runJob(
  707. "GET_SONG_FROM_ID",
  708. {
  709. songId:
  710. playlist[
  711. currentSongIndex
  712. ].songId,
  713. }
  714. )
  715. .then((response) =>
  716. callback(
  717. null,
  718. response.song
  719. )
  720. )
  721. .catch(callback);
  722. } else
  723. return next(
  724. null,
  725. null,
  726. -14,
  727. station
  728. );
  729. }
  730. );
  731. });
  732. }
  733. if (
  734. station.type === "official" &&
  735. station.playlist.length === 0
  736. ) {
  737. return this.runJob(
  738. "CALCULATE_SONG_FOR_STATION",
  739. { station, bypassQueue: payload.bypassQueue },
  740. { bypassQueue: payload.bypassQueue }
  741. )
  742. .then((playlist) => {
  743. if (playlist.length === 0)
  744. return next(
  745. null,
  746. this.defaultSong,
  747. 0,
  748. station
  749. );
  750. else {
  751. this.songs
  752. .runJob("GET_SONG", {
  753. id: playlist[0],
  754. })
  755. .then((response) => {
  756. next(
  757. null,
  758. response.song,
  759. 0,
  760. station
  761. );
  762. })
  763. .catch((err) => {
  764. return next(
  765. null,
  766. this.defaultSong,
  767. 0,
  768. station
  769. );
  770. });
  771. }
  772. })
  773. .catch(next);
  774. }
  775. if (
  776. station.type === "official" &&
  777. station.playlist.length > 0
  778. ) {
  779. async.doUntil(
  780. (next) => {
  781. if (
  782. station.currentSongIndex <
  783. station.playlist.length - 1
  784. ) {
  785. this.songs
  786. .runJob("GET_SONG", {
  787. id:
  788. station.playlist[
  789. station.currentSongIndex +
  790. 1
  791. ],
  792. })
  793. .then((response) => {
  794. return next(
  795. null,
  796. response.song,
  797. station.currentSongIndex + 1
  798. );
  799. })
  800. .catch((err) => {
  801. station.currentSongIndex++;
  802. next(null, null, null);
  803. });
  804. } else {
  805. this.runJob(
  806. "CALCULATE_SONG_FOR_STATION",
  807. {
  808. station,
  809. bypassQueue:
  810. payload.bypassQueue,
  811. }
  812. )
  813. .then((newPlaylist) => {
  814. this.songs.getSong(
  815. newPlaylist[0],
  816. (err, song) => {
  817. if (err || !song)
  818. return next(
  819. null,
  820. this
  821. .defaultSong,
  822. 0
  823. );
  824. station.playlist = newPlaylist;
  825. next(null, song, 0);
  826. }
  827. );
  828. })
  829. .catch((err) => {
  830. next(null, this.defaultSong, 0);
  831. });
  832. }
  833. },
  834. (song, currentSongIndex, next) => {
  835. if (!!song)
  836. return next(
  837. null,
  838. true,
  839. currentSongIndex
  840. );
  841. else return next(null, false);
  842. },
  843. (err, song, currentSongIndex) => {
  844. return next(
  845. err,
  846. song,
  847. currentSongIndex,
  848. station
  849. );
  850. }
  851. );
  852. }
  853. },
  854. (song, currentSongIndex, station, next) => {
  855. let $set = {};
  856. if (song === null) $set.currentSong = null;
  857. else if (song.likes === -1 && song.dislikes === -1) {
  858. $set.currentSong = {
  859. songId: song.songId,
  860. title: song.title,
  861. duration: song.duration,
  862. skipDuration: 0,
  863. likes: -1,
  864. dislikes: -1,
  865. };
  866. } else {
  867. $set.currentSong = {
  868. songId: song.songId,
  869. title: song.title,
  870. artists: song.artists,
  871. duration: song.duration,
  872. likes: song.likes,
  873. dislikes: song.dislikes,
  874. skipDuration: song.skipDuration,
  875. thumbnail: song.thumbnail,
  876. };
  877. }
  878. if (currentSongIndex >= 0)
  879. $set.currentSongIndex = currentSongIndex;
  880. $set.startedAt = Date.now();
  881. $set.timePaused = 0;
  882. if (station.paused) $set.pausedAt = Date.now();
  883. next(null, $set, station);
  884. },
  885. ($set, station, next) => {
  886. this.stationModel.updateOne(
  887. { _id: station._id },
  888. { $set },
  889. (err) => {
  890. this.runJob(
  891. "UPDATE_STATION",
  892. {
  893. stationId: station._id,
  894. bypassQueue: payload.bypassQueue,
  895. },
  896. { bypassQueue: payload.bypassQueue }
  897. )
  898. .then((station) => {
  899. if (
  900. station.type === "community" &&
  901. station.partyMode === true
  902. )
  903. this.cache
  904. .runJob("PUB", {
  905. channel:
  906. "station.queueUpdate",
  907. value: payload.stationId,
  908. })
  909. .then()
  910. .catch();
  911. next(null, station);
  912. })
  913. .catch(next);
  914. }
  915. );
  916. },
  917. ],
  918. async (err, station) => {
  919. if (err) {
  920. err = await this.utils.runJob("GET_ERROR", {
  921. error: err,
  922. });
  923. this.log(
  924. "ERROR",
  925. `Skipping station "${payload.stationId}" failed. "${err}"`
  926. );
  927. reject(new Error(err));
  928. } else {
  929. if (
  930. station.currentSong !== null &&
  931. station.currentSong.songId !== undefined
  932. ) {
  933. station.currentSong.skipVotes = 0;
  934. }
  935. //TODO Pub/Sub this
  936. this.utils
  937. .runJob("EMIT_TO_ROOM", {
  938. room: `station.${station._id}`,
  939. args: [
  940. "event:songs.next",
  941. {
  942. currentSong: station.currentSong,
  943. startedAt: station.startedAt,
  944. paused: station.paused,
  945. timePaused: 0,
  946. },
  947. ],
  948. })
  949. .then()
  950. .catch();
  951. if (station.privacy === "public") {
  952. this.utils
  953. .runJob("EMIT_TO_ROOM", {
  954. room: "home",
  955. args: [
  956. "event:station.nextSong",
  957. station._id,
  958. station.currentSong,
  959. ],
  960. })
  961. .then()
  962. .catch();
  963. } else {
  964. let sockets = await this.utils.runJob(
  965. "GET_ROOM_SOCKETS",
  966. { room: "home" }
  967. );
  968. for (let socketId in sockets) {
  969. let socket = sockets[socketId];
  970. let session = sockets[socketId].session;
  971. if (session.sessionId) {
  972. this.cache
  973. .runJob("HGET", {
  974. table: "sessions",
  975. key: session.sessionId,
  976. })
  977. .then((session) => {
  978. if (session) {
  979. this.db
  980. .runJob("GET_MODEL", {
  981. modelName: "user",
  982. })
  983. .then((userModel) => {
  984. userModel.findOne(
  985. {
  986. _id:
  987. session.userId,
  988. },
  989. (err, user) => {
  990. if (
  991. !err &&
  992. user
  993. ) {
  994. if (
  995. user.role ===
  996. "admin"
  997. )
  998. socket.emit(
  999. "event:station.nextSong",
  1000. station._id,
  1001. station.currentSong
  1002. );
  1003. else if (
  1004. station.type ===
  1005. "community" &&
  1006. station.owner ===
  1007. session.userId
  1008. )
  1009. socket.emit(
  1010. "event:station.nextSong",
  1011. station._id,
  1012. station.currentSong
  1013. );
  1014. }
  1015. }
  1016. );
  1017. });
  1018. }
  1019. });
  1020. }
  1021. }
  1022. }
  1023. if (
  1024. station.currentSong !== null &&
  1025. station.currentSong.songId !== undefined
  1026. ) {
  1027. this.utils.runJob("SOCKETS_JOIN_SONG_ROOM", {
  1028. sockets: await this.utils.runJob(
  1029. "GET_ROOM_SOCKETS",
  1030. { room: `station.${station._id}` }
  1031. ),
  1032. room: `song.${station.currentSong.songId}`,
  1033. });
  1034. if (!station.paused) {
  1035. this.notifications.runJob("SCHEDULE", {
  1036. name: `stations.nextSong?id=${station._id}`,
  1037. time: station.currentSong.duration * 1000,
  1038. station,
  1039. });
  1040. }
  1041. } else {
  1042. this.utils
  1043. .runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  1044. sockets: await this.utils.runJob(
  1045. "GET_ROOM_SOCKETS",
  1046. { room: `station.${station._id}` }
  1047. ),
  1048. })
  1049. .then()
  1050. .catch();
  1051. }
  1052. resolve({ station: station });
  1053. }
  1054. }
  1055. );
  1056. });
  1057. }
  1058. CAN_USER_VIEW_STATION(payload) {
  1059. // station, userId, cb
  1060. return new Promise((resolve, reject) => {
  1061. async.waterfall(
  1062. [
  1063. (next) => {
  1064. if (payload.station.privacy !== "private")
  1065. return next(true);
  1066. if (!payload.userId) return next("Not allowed");
  1067. next();
  1068. },
  1069. (next) => {
  1070. this.db
  1071. .runJob("GET_MODEL", {
  1072. modelName: "user",
  1073. })
  1074. .then((userModel) => {
  1075. userModel.findOne(
  1076. { _id: payload.userId },
  1077. next
  1078. );
  1079. });
  1080. },
  1081. (user, next) => {
  1082. if (!user) return next("Not allowed");
  1083. if (user.role === "admin") return next(true);
  1084. if (payload.station.type === "official")
  1085. return next("Not allowed");
  1086. if (payload.station.owner === payload.userId)
  1087. return next(true);
  1088. next("Not allowed");
  1089. },
  1090. ],
  1091. async (errOrResult) => {
  1092. if (errOrResult !== true && errOrResult !== "Not allowed") {
  1093. errOrResult = await this.utils.runJob("GET_ERROR", {
  1094. error: errOrResult,
  1095. });
  1096. reject(new Error(errOrResult));
  1097. } else {
  1098. resolve(errOrResult === true ? true : false);
  1099. }
  1100. }
  1101. );
  1102. });
  1103. }
  1104. }
  1105. module.exports = new StationsModule();