stations.js 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197
  1. const CoreClass = require("../core.js");
  2. const async = require("async");
  3. let subscription = null;
  4. class StationsModule extends CoreClass {
  5. constructor() {
  6. super("stations");
  7. }
  8. initialize() {
  9. return new Promise(async (resolve, reject) => {
  10. this.cache = this.moduleManager.modules["cache"];
  11. this.db = this.moduleManager.modules["db"];
  12. this.utils = this.moduleManager.modules["utils"];
  13. this.songs = this.moduleManager.modules["songs"];
  14. this.notifications = this.moduleManager.modules["notifications"];
  15. this.defaultSong = {
  16. songId: "60ItHLz5WEA",
  17. title: "Faded - Alan Walker",
  18. duration: 212,
  19. skipDuration: 0,
  20. likes: -1,
  21. dislikes: -1,
  22. };
  23. //TEMP
  24. this.cache.runJob("SUB", {
  25. channel: "station.pause",
  26. cb: async (stationId) => {
  27. this.notifications
  28. .runJob("REMOVE", {
  29. subscription: `stations.nextSong?id=${stationId}`,
  30. })
  31. .then();
  32. },
  33. });
  34. this.cache.runJob("SUB", {
  35. channel: "station.resume",
  36. cb: async (stationId) => {
  37. this.runJob("INITIALIZE_STATION", { stationId }).then();
  38. },
  39. });
  40. this.cache.runJob("SUB", {
  41. channel: "station.queueUpdate",
  42. cb: async (stationId) => {
  43. this.runJob("GET_STATION", { stationId }).then(
  44. (station) => {
  45. if (
  46. !station.currentSong &&
  47. station.queue.length > 0
  48. ) {
  49. this.runJob("INITIALIZE_STATION", {
  50. stationId,
  51. }).then();
  52. }
  53. }
  54. );
  55. },
  56. });
  57. this.cache.runJob("SUB", {
  58. channel: "station.newOfficialPlaylist",
  59. cb: async (stationId) => {
  60. this.cache
  61. .runJob("HGET", {
  62. table: "officialPlaylists",
  63. key: stationId,
  64. })
  65. .then((playlistObj) => {
  66. if (playlistObj) {
  67. this.utils.runJob("EMIT_TO_ROOM", {
  68. room: `station.${stationId}`,
  69. args: [
  70. "event:newOfficialPlaylist",
  71. playlistObj.songs,
  72. ],
  73. });
  74. }
  75. });
  76. },
  77. });
  78. const stationModel = (this.stationModel = await this.db.runJob(
  79. "GET_MODEL",
  80. {
  81. modelName: "station",
  82. }
  83. ));
  84. const stationSchema = (this.stationSchema = await this.cache.runJob(
  85. "GET_SCHEMA",
  86. {
  87. schemaName: "station",
  88. }
  89. ));
  90. async.waterfall(
  91. [
  92. (next) => {
  93. this.setStage(2);
  94. this.cache
  95. .runJob("HGETALL", { table: "stations" })
  96. .then((stations) => {
  97. next(null, stations);
  98. })
  99. .catch(next);
  100. },
  101. (stations, next) => {
  102. this.setStage(3);
  103. if (!stations) return next();
  104. let stationIds = Object.keys(stations);
  105. async.each(
  106. stationIds,
  107. (stationId, next) => {
  108. stationModel.findOne(
  109. { _id: stationId },
  110. (err, station) => {
  111. if (err) next(err);
  112. else if (!station) {
  113. this.cache
  114. .runJob("HDEL", {
  115. table: "stations",
  116. key: stationId,
  117. })
  118. .then(() => {
  119. next();
  120. })
  121. .catch(next);
  122. } else next();
  123. }
  124. );
  125. },
  126. next
  127. );
  128. },
  129. (next) => {
  130. this.setStage(4);
  131. stationModel.find({}, next);
  132. },
  133. (stations, next) => {
  134. this.setStage(5);
  135. async.each(
  136. stations,
  137. (station, next2) => {
  138. async.waterfall(
  139. [
  140. (next) => {
  141. this.cache
  142. .runJob("HSET", {
  143. table: "stations",
  144. key: station._id,
  145. value: stationSchema(
  146. station
  147. ),
  148. })
  149. .then((station) =>
  150. next(null, station)
  151. )
  152. .catch(next);
  153. },
  154. (station, next) => {
  155. this.runJob(
  156. "INITIALIZE_STATION",
  157. {
  158. stationId: station._id,
  159. bypassQueue: true,
  160. },
  161. { bypassQueue: true }
  162. )
  163. .then(() => {
  164. next();
  165. })
  166. .catch(next); // bypassQueue is true because otherwise the module will never initialize
  167. },
  168. ],
  169. (err) => {
  170. next2(err);
  171. }
  172. );
  173. },
  174. next
  175. );
  176. },
  177. ],
  178. async (err) => {
  179. if (err) {
  180. err = await this.utils.runJob("GET_ERROR", {
  181. error: err,
  182. });
  183. reject(new Error(err));
  184. } else {
  185. resolve();
  186. }
  187. }
  188. );
  189. });
  190. }
  191. INITIALIZE_STATION(payload) {
  192. //stationId, cb, bypassValidate = false
  193. return new Promise((resolve, reject) => {
  194. // if (typeof cb !== 'function') cb = ()=>{};
  195. async.waterfall(
  196. [
  197. (next) => {
  198. this.runJob(
  199. "GET_STATION",
  200. {
  201. stationId: payload.stationId,
  202. bypassQueue: payload.bypassQueue,
  203. },
  204. { bypassQueue: payload.bypassQueue }
  205. )
  206. .then((station) => {
  207. next(null, station);
  208. })
  209. .catch(next);
  210. },
  211. (station, next) => {
  212. if (!station) return next("Station not found.");
  213. this.notifications
  214. .runJob("UNSCHEDULE", {
  215. name: `stations.nextSong?id=${station._id}`,
  216. })
  217. .then()
  218. .catch();
  219. this.notifications
  220. .runJob("SUBSCRIBE", {
  221. name: `stations.nextSong?id=${station._id}`,
  222. cb: () =>
  223. this.runJob("SKIP_STATION", {
  224. stationId: station._id,
  225. }),
  226. unique: true,
  227. station,
  228. })
  229. .then()
  230. .catch();
  231. if (station.paused) return next(true, station);
  232. next(null, station);
  233. },
  234. (station, next) => {
  235. if (!station.currentSong) {
  236. return this.runJob(
  237. "SKIP_STATION",
  238. {
  239. stationId: station._id,
  240. bypassQueue: payload.bypassQueue,
  241. },
  242. { bypassQueue: payload.bypassQueue }
  243. )
  244. .then((station) => {
  245. next(true, station);
  246. })
  247. .catch(next)
  248. .finally(() => {});
  249. }
  250. let timeLeft =
  251. station.currentSong.duration * 1000 -
  252. (Date.now() -
  253. station.startedAt -
  254. station.timePaused);
  255. if (isNaN(timeLeft)) timeLeft = -1;
  256. if (
  257. station.currentSong.duration * 1000 < timeLeft ||
  258. timeLeft < 0
  259. ) {
  260. this.runJob(
  261. "SKIP_STATION",
  262. {
  263. stationId: station._id,
  264. bypassQueue: payload.bypassQueue,
  265. },
  266. { bypassQueue: payload.bypassQueue }
  267. )
  268. .then((station) => {
  269. next(null, station);
  270. })
  271. .catch(next);
  272. } else {
  273. //name, time, cb, station
  274. this.notifications.runJob("SCHEDULE", {
  275. name: `stations.nextSong?id=${station._id}`,
  276. time: timeLeft,
  277. station,
  278. });
  279. next(null, station);
  280. }
  281. },
  282. ],
  283. async (err, station) => {
  284. if (err && err !== true) {
  285. err = await this.utils.runJob("GET_ERROR", {
  286. error: err,
  287. });
  288. reject(new Error(err));
  289. } else resolve(station);
  290. }
  291. );
  292. });
  293. }
  294. CALCULATE_SONG_FOR_STATION(payload) {
  295. //station, cb, bypassValidate = false
  296. return new Promise(async (resolve, reject) => {
  297. const songModel = await this.db.runJob("GET_MODEL", {
  298. modelName: "song",
  299. });
  300. const stationModel = await this.db.runJob("GET_MODEL", {
  301. modelName: "station",
  302. });
  303. let songList = [];
  304. async.waterfall(
  305. [
  306. (next) => {
  307. if (payload.station.genres.length === 0) return next();
  308. let genresDone = [];
  309. payload.station.genres.forEach((genre) => {
  310. songModel.find({ genres: genre }, (err, songs) => {
  311. if (!err) {
  312. songs.forEach((song) => {
  313. if (songList.indexOf(song._id) === -1) {
  314. let found = false;
  315. song.genres.forEach((songGenre) => {
  316. if (
  317. payload.station.blacklistedGenres.indexOf(
  318. songGenre
  319. ) !== -1
  320. )
  321. found = true;
  322. });
  323. if (!found) {
  324. songList.push(song._id);
  325. }
  326. }
  327. });
  328. }
  329. genresDone.push(genre);
  330. if (
  331. genresDone.length ===
  332. payload.station.genres.length
  333. )
  334. next();
  335. });
  336. });
  337. },
  338. (next) => {
  339. let playlist = [];
  340. songList.forEach(function(songId) {
  341. if (payload.station.playlist.indexOf(songId) === -1)
  342. playlist.push(songId);
  343. });
  344. payload.station.playlist.filter((songId) => {
  345. if (songList.indexOf(songId) !== -1)
  346. playlist.push(songId);
  347. });
  348. this.utils
  349. .runJob("SHUFFLE", { array: playlist })
  350. .then((result) => {
  351. next(null, result.array);
  352. })
  353. .catch(next);
  354. },
  355. (playlist, next) => {
  356. this.runJob(
  357. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  358. {
  359. stationId: payload.station._id,
  360. songList: playlist,
  361. bypassQueue: payload.bypassQueue,
  362. },
  363. { bypassQueue: payload.bypassQueue }
  364. )
  365. .then(() => {
  366. next(null, playlist);
  367. })
  368. .catch(next);
  369. },
  370. (playlist, next) => {
  371. stationModel.updateOne(
  372. { _id: payload.station._id },
  373. { $set: { playlist: playlist } },
  374. { runValidators: true },
  375. (err) => {
  376. this.runJob(
  377. "UPDATE_STATION",
  378. {
  379. stationId: payload.station._id,
  380. bypassQueue: payload.bypassQueue,
  381. },
  382. { bypassQueue: payload.bypassQueue }
  383. )
  384. .then(() => {
  385. next(null, playlist);
  386. })
  387. .catch(next);
  388. }
  389. );
  390. },
  391. ],
  392. (err, newPlaylist) => {
  393. if (err) return reject(new Error(err));
  394. resolve(newPlaylist);
  395. }
  396. );
  397. });
  398. }
  399. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  400. GET_STATION(payload) {
  401. //stationId, cb, bypassValidate = false
  402. return new Promise((resolve, reject) => {
  403. async.waterfall(
  404. [
  405. (next) => {
  406. this.cache
  407. .runJob("HGET", {
  408. table: "stations",
  409. key: payload.stationId,
  410. })
  411. .then((station) => {
  412. next(null, station);
  413. })
  414. .catch(next);
  415. },
  416. (station, next) => {
  417. if (station) return next(true, station);
  418. this.stationModel.findOne(
  419. { _id: payload.stationId },
  420. next
  421. );
  422. },
  423. (station, next) => {
  424. if (station) {
  425. if (station.type === "official") {
  426. this.runJob(
  427. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  428. {
  429. stationId: station._id,
  430. songList: station.playlist,
  431. }
  432. )
  433. .then()
  434. .catch();
  435. }
  436. station = this.stationSchema(station);
  437. this.cache
  438. .runJob("HSET", {
  439. table: "stations",
  440. key: payload.stationId,
  441. value: station,
  442. })
  443. .then()
  444. .catch();
  445. next(true, station);
  446. } else next("Station not found");
  447. },
  448. ],
  449. async (err, station) => {
  450. if (err && err !== true) {
  451. err = await this.utils.runJob("GET_ERROR", {
  452. error: err,
  453. });
  454. reject(new Error(err));
  455. } else resolve(station);
  456. }
  457. );
  458. });
  459. }
  460. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  461. GET_STATION_BY_NAME(payload) {
  462. //stationName, cb
  463. return new Promise(async (resolve, reject) => {
  464. const stationModel = await this.db.runJob("GET_MODEL", {
  465. modelName: "station",
  466. });
  467. async.waterfall(
  468. [
  469. (next) => {
  470. stationModel.findOne(
  471. { name: payload.stationName },
  472. next
  473. );
  474. },
  475. (station, next) => {
  476. if (station) {
  477. if (station.type === "official") {
  478. this.runJob(
  479. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  480. {
  481. stationId: station._id,
  482. songList: station.playlist,
  483. }
  484. );
  485. }
  486. this.cache
  487. .runJob("GET_SCHEMA", { schemaName: "station" })
  488. .then((stationSchema) => {
  489. station = stationSchema(station);
  490. this.cache.runJob("HSET", {
  491. table: "stations",
  492. key: station._id,
  493. value: station,
  494. });
  495. next(true, station);
  496. });
  497. } else next("Station not found");
  498. },
  499. ],
  500. (err, station) => {
  501. if (err && err !== true) return reject(new Error(err));
  502. resolve(station);
  503. }
  504. );
  505. });
  506. }
  507. UPDATE_STATION(payload) {
  508. //stationId, cb, bypassValidate = false
  509. return new Promise((resolve, reject) => {
  510. async.waterfall(
  511. [
  512. (next) => {
  513. this.stationModel.findOne(
  514. { _id: payload.stationId },
  515. next
  516. );
  517. },
  518. (station, next) => {
  519. if (!station) {
  520. this.cache
  521. .runJob("HDEL", {
  522. table: "stations",
  523. key: payload.stationId,
  524. })
  525. .then()
  526. .catch();
  527. return next("Station not found");
  528. }
  529. this.cache
  530. .runJob("HSET", {
  531. table: "stations",
  532. key: payload.stationId,
  533. value: station,
  534. })
  535. .then((station) => {
  536. next(null, station);
  537. })
  538. .catch(next);
  539. },
  540. ],
  541. async (err, station) => {
  542. if (err && err !== true) {
  543. err = await this.utils.runJob("GET_ERROR", {
  544. error: err,
  545. });
  546. reject(new Error(err));
  547. } else resolve(station);
  548. }
  549. );
  550. });
  551. }
  552. CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  553. //stationId, songList, cb, bypassValidate = false
  554. return new Promise(async (resolve, reject) => {
  555. const officialPlaylistSchema = await this.cache.runJob(
  556. "GET_SCHEMA",
  557. {
  558. schemaName: "officialPlaylist",
  559. }
  560. );
  561. let lessInfoPlaylist = [];
  562. async.each(
  563. payload.songList,
  564. (song, next) => {
  565. this.songs
  566. .runJob("GET_SONG", { id: song })
  567. .then((response) => {
  568. const song = response.song;
  569. if (song) {
  570. let newSong = {
  571. songId: song.songId,
  572. title: song.title,
  573. artists: song.artists,
  574. duration: song.duration,
  575. };
  576. lessInfoPlaylist.push(newSong);
  577. }
  578. })
  579. .finally(() => {
  580. next();
  581. });
  582. },
  583. () => {
  584. this.cache
  585. .runJob("HSET", {
  586. table: "officialPlaylists",
  587. key: payload.stationId,
  588. value: officialPlaylistSchema(
  589. payload.stationId,
  590. lessInfoPlaylist
  591. ),
  592. })
  593. .finally(() => {
  594. this.cache.runJob("PUB", {
  595. channel: "station.newOfficialPlaylist",
  596. value: payload.stationId,
  597. });
  598. resolve();
  599. });
  600. }
  601. );
  602. });
  603. }
  604. SKIP_STATION(payload) {
  605. //stationId
  606. return new Promise((resolve, reject) => {
  607. this.log("INFO", `Skipping station ${payload.stationId}.`);
  608. this.log(
  609. "STATION_ISSUE",
  610. `SKIP_STATION_CB - Station ID: ${payload.stationId}.`
  611. );
  612. async.waterfall(
  613. [
  614. (next) => {
  615. this.runJob(
  616. "GET_STATION",
  617. {
  618. stationId: payload.stationId,
  619. bypassQueue: payload.bypassQueue,
  620. },
  621. { bypassQueue: payload.bypassQueue }
  622. )
  623. .then((station) => {
  624. next(null, station);
  625. })
  626. .catch(() => {});
  627. },
  628. (station, next) => {
  629. if (!station) return next("Station not found.");
  630. if (
  631. station.type === "community" &&
  632. station.partyMode &&
  633. station.queue.length === 0
  634. )
  635. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  636. if (
  637. station.type === "community" &&
  638. station.partyMode &&
  639. station.queue.length > 0
  640. ) {
  641. // Community station with party mode enabled and songs in the queue
  642. if (station.paused) {
  643. return next(null, null, -19, station);
  644. } else {
  645. return this.stationModel.updateOne(
  646. { _id: payload.stationId },
  647. {
  648. $pull: {
  649. queue: {
  650. _id: station.queue[0]._id,
  651. },
  652. },
  653. },
  654. (err) => {
  655. if (err) return next(err);
  656. next(
  657. null,
  658. station.queue[0],
  659. -12,
  660. station
  661. );
  662. }
  663. );
  664. }
  665. }
  666. if (
  667. station.type === "community" &&
  668. !station.partyMode
  669. ) {
  670. this.db
  671. .runJob("GET_MODEL", { modelName: "playlist" })
  672. .then((playlistModel) => {
  673. return playlistModel.findOne(
  674. { _id: station.privatePlaylist },
  675. (err, playlist) => {
  676. if (err) return next(err);
  677. if (!playlist)
  678. return next(
  679. null,
  680. null,
  681. -13,
  682. station
  683. );
  684. playlist = playlist.songs;
  685. if (playlist.length > 0) {
  686. let currentSongIndex;
  687. if (
  688. station.currentSongIndex <
  689. playlist.length - 1
  690. )
  691. currentSongIndex =
  692. station.currentSongIndex +
  693. 1;
  694. else currentSongIndex = 0;
  695. let callback = (err, song) => {
  696. if (err) return next(err);
  697. if (song)
  698. return next(
  699. null,
  700. song,
  701. currentSongIndex,
  702. station
  703. );
  704. else {
  705. let song =
  706. playlist[
  707. currentSongIndex
  708. ];
  709. let currentSong = {
  710. songId: song.songId,
  711. title: song.title,
  712. duration:
  713. song.duration,
  714. likes: -1,
  715. dislikes: -1,
  716. };
  717. return next(
  718. null,
  719. currentSong,
  720. currentSongIndex,
  721. station
  722. );
  723. }
  724. };
  725. if (
  726. playlist[currentSongIndex]
  727. ._id
  728. )
  729. this.songs
  730. .runJob("GET_SONG", {
  731. id:
  732. playlist[
  733. currentSongIndex
  734. ]._id,
  735. })
  736. .then((response) =>
  737. callback(
  738. null,
  739. response.song
  740. )
  741. )
  742. .catch(callback);
  743. else
  744. this.songs
  745. .runJob(
  746. "GET_SONG_FROM_ID",
  747. {
  748. songId:
  749. playlist[
  750. currentSongIndex
  751. ].songId,
  752. }
  753. )
  754. .then((response) =>
  755. callback(
  756. null,
  757. response.song
  758. )
  759. )
  760. .catch(callback);
  761. } else
  762. return next(
  763. null,
  764. null,
  765. -14,
  766. station
  767. );
  768. }
  769. );
  770. });
  771. }
  772. if (
  773. station.type === "official" &&
  774. station.playlist.length === 0
  775. ) {
  776. return this.runJob(
  777. "CALCULATE_SONG_FOR_STATION",
  778. { station, bypassQueue: payload.bypassQueue },
  779. { bypassQueue: payload.bypassQueue }
  780. )
  781. .then((playlist) => {
  782. if (playlist.length === 0)
  783. return next(
  784. null,
  785. this.defaultSong,
  786. 0,
  787. station
  788. );
  789. else {
  790. this.songs
  791. .runJob("GET_SONG", {
  792. id: playlist[0],
  793. })
  794. .then((response) => {
  795. next(
  796. null,
  797. response.song,
  798. 0,
  799. station
  800. );
  801. })
  802. .catch((err) => {
  803. return next(
  804. null,
  805. this.defaultSong,
  806. 0,
  807. station
  808. );
  809. });
  810. }
  811. })
  812. .catch(next);
  813. }
  814. if (
  815. station.type === "official" &&
  816. station.playlist.length > 0
  817. ) {
  818. async.doUntil(
  819. (next) => {
  820. if (
  821. station.currentSongIndex <
  822. station.playlist.length - 1
  823. ) {
  824. this.songs
  825. .runJob("GET_SONG", {
  826. id:
  827. station.playlist[
  828. station.currentSongIndex +
  829. 1
  830. ],
  831. })
  832. .then((response) => {
  833. return next(
  834. null,
  835. response.song,
  836. station.currentSongIndex + 1
  837. );
  838. })
  839. .catch((err) => {
  840. station.currentSongIndex++;
  841. next(null, null, null);
  842. });
  843. } else {
  844. this.runJob(
  845. "CALCULATE_SONG_FOR_STATION",
  846. {
  847. station,
  848. bypassQueue:
  849. payload.bypassQueue,
  850. },
  851. { bypassQueue: payload.bypassQueue }
  852. )
  853. .then((newPlaylist) => {
  854. this.songs
  855. .runJob("GET_SONG", { id: newPlaylist[0] })
  856. .then((response) => {
  857. station.playlist = newPlaylist;
  858. next(null, response.song, 0);
  859. })
  860. .catch(err => {
  861. return next(
  862. null,
  863. this.defaultSong,
  864. 0
  865. );
  866. });
  867. })
  868. .catch((err) => {
  869. next(null, this.defaultSong, 0);
  870. });
  871. }
  872. },
  873. (song, currentSongIndex, next) => {
  874. if (!!song)
  875. return next(
  876. null,
  877. true,
  878. currentSongIndex
  879. );
  880. else return next(null, false);
  881. },
  882. (err, song, currentSongIndex) => {
  883. return next(
  884. err,
  885. song,
  886. currentSongIndex,
  887. station
  888. );
  889. }
  890. );
  891. }
  892. },
  893. (song, currentSongIndex, station, next) => {
  894. let $set = {};
  895. if (song === null) $set.currentSong = null;
  896. else if (song.likes === -1 && song.dislikes === -1) {
  897. $set.currentSong = {
  898. songId: song.songId,
  899. title: song.title,
  900. duration: song.duration,
  901. skipDuration: 0,
  902. likes: -1,
  903. dislikes: -1,
  904. };
  905. } else {
  906. $set.currentSong = {
  907. songId: song.songId,
  908. title: song.title,
  909. artists: song.artists,
  910. duration: song.duration,
  911. likes: song.likes,
  912. dislikes: song.dislikes,
  913. skipDuration: song.skipDuration,
  914. thumbnail: song.thumbnail,
  915. };
  916. }
  917. if (currentSongIndex >= 0)
  918. $set.currentSongIndex = currentSongIndex;
  919. $set.startedAt = Date.now();
  920. $set.timePaused = 0;
  921. if (station.paused) $set.pausedAt = Date.now();
  922. next(null, $set, station);
  923. },
  924. ($set, station, next) => {
  925. this.stationModel.updateOne(
  926. { _id: station._id },
  927. { $set },
  928. (err) => {
  929. this.runJob(
  930. "UPDATE_STATION",
  931. {
  932. stationId: station._id,
  933. bypassQueue: payload.bypassQueue,
  934. },
  935. { bypassQueue: payload.bypassQueue }
  936. )
  937. .then((station) => {
  938. if (
  939. station.type === "community" &&
  940. station.partyMode === true
  941. )
  942. this.cache
  943. .runJob("PUB", {
  944. channel:
  945. "station.queueUpdate",
  946. value: payload.stationId,
  947. })
  948. .then()
  949. .catch();
  950. next(null, station);
  951. })
  952. .catch(next);
  953. }
  954. );
  955. },
  956. ],
  957. async (err, station) => {
  958. if (err) {
  959. err = await this.utils.runJob("GET_ERROR", {
  960. error: err,
  961. });
  962. this.log(
  963. "ERROR",
  964. `Skipping station "${payload.stationId}" failed. "${err}"`
  965. );
  966. reject(new Error(err));
  967. } else {
  968. if (
  969. station.currentSong !== null &&
  970. station.currentSong.songId !== undefined
  971. ) {
  972. station.currentSong.skipVotes = 0;
  973. }
  974. //TODO Pub/Sub this
  975. this.utils
  976. .runJob("EMIT_TO_ROOM", {
  977. room: `station.${station._id}`,
  978. args: [
  979. "event:songs.next",
  980. {
  981. currentSong: station.currentSong,
  982. startedAt: station.startedAt,
  983. paused: station.paused,
  984. timePaused: 0,
  985. },
  986. ],
  987. })
  988. .then()
  989. .catch();
  990. if (station.privacy === "public") {
  991. this.utils
  992. .runJob("EMIT_TO_ROOM", {
  993. room: "home",
  994. args: [
  995. "event:station.nextSong",
  996. station._id,
  997. station.currentSong,
  998. ],
  999. })
  1000. .then()
  1001. .catch();
  1002. } else {
  1003. let sockets = await this.utils.runJob(
  1004. "GET_ROOM_SOCKETS",
  1005. { room: "home" }
  1006. );
  1007. for (let socketId in sockets) {
  1008. let socket = sockets[socketId];
  1009. let session = sockets[socketId].session;
  1010. if (session.sessionId) {
  1011. this.cache
  1012. .runJob("HGET", {
  1013. table: "sessions",
  1014. key: session.sessionId,
  1015. })
  1016. .then((session) => {
  1017. if (session) {
  1018. this.db
  1019. .runJob("GET_MODEL", {
  1020. modelName: "user",
  1021. })
  1022. .then((userModel) => {
  1023. userModel.findOne(
  1024. {
  1025. _id:
  1026. session.userId,
  1027. },
  1028. (err, user) => {
  1029. if (
  1030. !err &&
  1031. user
  1032. ) {
  1033. if (
  1034. user.role ===
  1035. "admin"
  1036. )
  1037. socket.emit(
  1038. "event:station.nextSong",
  1039. station._id,
  1040. station.currentSong
  1041. );
  1042. else if (
  1043. station.type ===
  1044. "community" &&
  1045. station.owner ===
  1046. session.userId
  1047. )
  1048. socket.emit(
  1049. "event:station.nextSong",
  1050. station._id,
  1051. station.currentSong
  1052. );
  1053. }
  1054. }
  1055. );
  1056. });
  1057. }
  1058. });
  1059. }
  1060. }
  1061. }
  1062. if (
  1063. station.currentSong !== null &&
  1064. station.currentSong.songId !== undefined
  1065. ) {
  1066. this.utils.runJob("SOCKETS_JOIN_SONG_ROOM", {
  1067. sockets: await this.utils.runJob(
  1068. "GET_ROOM_SOCKETS",
  1069. { room: `station.${station._id}` }
  1070. ),
  1071. room: `song.${station.currentSong.songId}`,
  1072. });
  1073. if (!station.paused) {
  1074. this.notifications.runJob("SCHEDULE", {
  1075. name: `stations.nextSong?id=${station._id}`,
  1076. time: station.currentSong.duration * 1000,
  1077. station,
  1078. });
  1079. }
  1080. } else {
  1081. this.utils
  1082. .runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  1083. sockets: await this.utils.runJob(
  1084. "GET_ROOM_SOCKETS",
  1085. { room: `station.${station._id}` }
  1086. ),
  1087. })
  1088. .then()
  1089. .catch();
  1090. }
  1091. resolve({ station: station });
  1092. }
  1093. }
  1094. );
  1095. });
  1096. }
  1097. CAN_USER_VIEW_STATION(payload) {
  1098. // station, userId, hideUnlisted, cb
  1099. return new Promise((resolve, reject) => {
  1100. async.waterfall(
  1101. [
  1102. (next) => {
  1103. if (payload.station.privacy === "public")
  1104. return next(true);
  1105. if (payload.station.privacy === "unlisted")
  1106. if (payload.hideUnlisted === true)
  1107. return next();
  1108. else
  1109. return next(true);
  1110. if (!payload.userId) return next("Not allowed");
  1111. next();
  1112. },
  1113. (next) => {
  1114. this.db
  1115. .runJob("GET_MODEL", {
  1116. modelName: "user",
  1117. })
  1118. .then((userModel) => {
  1119. userModel.findOne(
  1120. { _id: payload.userId },
  1121. next
  1122. );
  1123. });
  1124. },
  1125. (user, next) => {
  1126. if (!user) return next("Not allowed");
  1127. if (user.role === "admin") return next(true);
  1128. if (payload.station.type === "official")
  1129. return next("Not allowed");
  1130. if (payload.station.owner === payload.userId)
  1131. return next(true);
  1132. next("Not allowed");
  1133. },
  1134. ],
  1135. async (errOrResult) => {
  1136. if (errOrResult !== true && errOrResult !== "Not allowed") {
  1137. errOrResult = await this.utils.runJob("GET_ERROR", {
  1138. error: errOrResult,
  1139. });
  1140. reject(new Error(errOrResult));
  1141. } else {
  1142. resolve(errOrResult === true ? true : false);
  1143. }
  1144. }
  1145. );
  1146. });
  1147. }
  1148. }
  1149. module.exports = new StationsModule();