stations.js 53 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148
  1. const CoreClass = require("../core.js");
  2. const async = require("async");
  3. let subscription = null;
  4. class StationsModule extends CoreClass {
  5. constructor() {
  6. super("stations");
  7. }
  8. initialize() {
  9. return new Promise(async (resolve, reject) => {
  10. this.cache = this.moduleManager.modules["cache"];
  11. this.db = this.moduleManager.modules["db"];
  12. this.utils = this.moduleManager.modules["utils"];
  13. this.songs = this.moduleManager.modules["songs"];
  14. this.notifications = this.moduleManager.modules["notifications"];
  15. this.defaultSong = {
  16. songId: "60ItHLz5WEA",
  17. title: "Faded - Alan Walker",
  18. duration: 212,
  19. skipDuration: 0,
  20. likes: -1,
  21. dislikes: -1,
  22. };
  23. //TEMP
  24. this.cache.runJob("SUB", {
  25. channel: "station.pause",
  26. cb: async (stationId) => {
  27. this.notifications
  28. .runJob("REMOVE", {
  29. subscription: `stations.nextSong?id=${stationId}`,
  30. })
  31. .then();
  32. },
  33. });
  34. this.cache.runJob("SUB", {
  35. channel: "station.resume",
  36. cb: async (stationId) => {
  37. this.runJob("INITIALIZE_STATION", { stationId }).then();
  38. },
  39. });
  40. this.cache.runJob("SUB", {
  41. channel: "station.queueUpdate",
  42. cb: async (stationId) => {
  43. this.runJob("GET_STATION", { stationId }).then(
  44. (station) => {
  45. if (
  46. !station.currentSong &&
  47. station.queue.length > 0
  48. ) {
  49. this.runJob("INITIALIZE_STATION", {
  50. stationId,
  51. }).then();
  52. }
  53. }
  54. );
  55. },
  56. });
  57. this.cache.runJob("SUB", {
  58. channel: "station.newOfficialPlaylist",
  59. cb: async (stationId) => {
  60. this.cache
  61. .runJob("HGET", {
  62. table: "officialPlaylists",
  63. key: stationId,
  64. })
  65. .then((playlistObj) => {
  66. if (playlistObj) {
  67. this.utils.runJob("EMIT_TO_ROOM", {
  68. room: `station.${stationId}`,
  69. args: [
  70. "event:newOfficialPlaylist",
  71. playlistObj.songs,
  72. ],
  73. });
  74. }
  75. });
  76. },
  77. });
  78. const stationModel = (this.stationModel = await this.db.runJob(
  79. "GET_MODEL",
  80. {
  81. modelName: "station",
  82. }
  83. ));
  84. const stationSchema = (this.stationSchema = await this.cache.runJob(
  85. "GET_SCHEMA",
  86. {
  87. schemaName: "station",
  88. }
  89. ));
  90. async.waterfall(
  91. [
  92. (next) => {
  93. this.setStage(2);
  94. this.cache
  95. .runJob("HGETALL", { table: "stations" })
  96. .then((stations) => next(null, stations))
  97. .catch(next);
  98. },
  99. (stations, next) => {
  100. this.setStage(3);
  101. if (!stations) return next();
  102. let stationIds = Object.keys(stations);
  103. async.each(
  104. stationIds,
  105. (stationId, next) => {
  106. stationModel.findOne(
  107. { _id: stationId },
  108. (err, station) => {
  109. if (err) next(err);
  110. else if (!station) {
  111. this.cache
  112. .runJob("HDEL", {
  113. table: "stations",
  114. key: stationId,
  115. })
  116. .then(() => next())
  117. .catch(next);
  118. } else next();
  119. }
  120. );
  121. },
  122. next
  123. );
  124. },
  125. (next) => {
  126. this.setStage(4);
  127. stationModel.find({}, next);
  128. },
  129. (stations, next) => {
  130. this.setStage(5);
  131. async.each(
  132. stations,
  133. (station, next2) => {
  134. async.waterfall(
  135. [
  136. (next) => {
  137. this.cache
  138. .runJob("HSET", {
  139. table: "stations",
  140. key: station._id,
  141. value: stationSchema(
  142. station
  143. ),
  144. })
  145. .then((station) =>
  146. next(null, station)
  147. )
  148. .catch(next);
  149. },
  150. (station, next) => {
  151. this.runJob(
  152. "INITIALIZE_STATION",
  153. {
  154. stationId: station._id,
  155. bypassQueue: true,
  156. },
  157. { bypassQueue: true }
  158. )
  159. .then(() => next())
  160. .catch(next); // bypassQueue is true because otherwise the module will never initialize
  161. },
  162. ],
  163. (err) => {
  164. next2(err);
  165. }
  166. );
  167. },
  168. next
  169. );
  170. },
  171. ],
  172. async (err) => {
  173. if (err) {
  174. err = await this.utils.runJob("GET_ERROR", {
  175. error: err,
  176. });
  177. reject(new Error(err));
  178. } else {
  179. resolve();
  180. }
  181. }
  182. );
  183. });
  184. }
  185. INITIALIZE_STATION(payload) {
  186. //stationId, cb, bypassValidate = false
  187. return new Promise((resolve, reject) => {
  188. // if (typeof cb !== 'function') cb = ()=>{};
  189. async.waterfall(
  190. [
  191. (next) => {
  192. this.runJob(
  193. "GET_STATION",
  194. { stationId: payload.stationId },
  195. { bypassQueue: payload.bypassQueue }
  196. )
  197. .then((station) => next(null, station))
  198. .catch(next);
  199. },
  200. (station, next) => {
  201. if (!station) return next("Station not found.");
  202. this.notifications
  203. .runJob("UNSCHEDULE", {
  204. subscription: `stations.nextSong?id=${station._id}`,
  205. })
  206. .then()
  207. .catch();
  208. this.notifications
  209. .runJob("SUBSCRIBE", {
  210. subscription: `stations.nextSong?id=${station._id}`,
  211. cb: () =>
  212. this.runJob("SKIP_STATION", {
  213. stationId: station._id,
  214. }),
  215. unique: true,
  216. station,
  217. })
  218. .then()
  219. .catch();
  220. if (station.paused) return next(true, station);
  221. next(null, station);
  222. },
  223. (station, next) => {
  224. if (!station.currentSong) {
  225. return this.runJob(
  226. "SKIP_STATION",
  227. {
  228. stationId: station._id,
  229. bypassQueue: payload.bypassQueue,
  230. },
  231. { bypassQueue: payload.bypassQueue }
  232. )
  233. .then((station) => next(true, station))
  234. .catch(next)
  235. .finally(() => {});
  236. }
  237. let timeLeft =
  238. station.currentSong.duration * 1000 -
  239. (Date.now() -
  240. station.startedAt -
  241. station.timePaused);
  242. if (isNaN(timeLeft)) timeLeft = -1;
  243. if (
  244. station.currentSong.duration * 1000 < timeLeft ||
  245. timeLeft < 0
  246. ) {
  247. this.runJob(
  248. "SKIP_STATION",
  249. { stationId: station._id },
  250. { bypassQueue: payload.bypassQueue }
  251. )
  252. .then((station) => next(null, station))
  253. .catch(next);
  254. } else {
  255. //name, time, cb, station
  256. this.notifications.runJob("SCHEDULE", {
  257. name: `stations.nextSong?id=${station._id}`,
  258. time: timeLeft,
  259. station,
  260. });
  261. next(null, station);
  262. }
  263. },
  264. ],
  265. async (err, station) => {
  266. if (err && err !== true) {
  267. err = await this.utils.runJob("GET_ERROR", {
  268. error: err,
  269. });
  270. reject(new Error(err));
  271. } else resolve(station);
  272. }
  273. );
  274. });
  275. }
  276. CALCULATE_SONG_FOR_STATION(payload) {
  277. //station, cb, bypassValidate = false
  278. return new Promise(async (resolve, reject) => {
  279. const songModel = await this.db.runJob("GET_MODEL", {
  280. modelName: "song",
  281. });
  282. const stationModel = await this.db.runJob("GET_MODEL", {
  283. modelName: "station",
  284. });
  285. let songList = [];
  286. async.waterfall(
  287. [
  288. (next) => {
  289. if (payload.station.genres.length === 0) return next();
  290. let genresDone = [];
  291. payload.station.genres.forEach((genre) => {
  292. songModel.find({ genres: genre }, (err, songs) => {
  293. if (!err) {
  294. songs.forEach((song) => {
  295. if (songList.indexOf(song._id) === -1) {
  296. let found = false;
  297. song.genres.forEach((songGenre) => {
  298. if (
  299. payload.station.blacklistedGenres.indexOf(
  300. songGenre
  301. ) !== -1
  302. )
  303. found = true;
  304. });
  305. if (!found) {
  306. songList.push(song._id);
  307. }
  308. }
  309. });
  310. }
  311. genresDone.push(genre);
  312. if (
  313. genresDone.length ===
  314. payload.station.genres.length
  315. )
  316. next();
  317. });
  318. });
  319. },
  320. (next) => {
  321. let playlist = [];
  322. songList.forEach(function(songId) {
  323. if (payload.station.playlist.indexOf(songId) === -1)
  324. playlist.push(songId);
  325. });
  326. payload.station.playlist.filter((songId) => {
  327. if (songList.indexOf(songId) !== -1)
  328. playlist.push(songId);
  329. });
  330. this.utils
  331. .runJob("SHUFFLE", { array: playlist })
  332. .then((playlist) => next(null, playlist))
  333. .catch(next);
  334. },
  335. (playlist, next) => {
  336. this.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  337. stationId,
  338. songList: playlist,
  339. })
  340. .then(() => next(null, playlist))
  341. .catch(next);
  342. },
  343. (playlist, next) => {
  344. stationModel.updateOne(
  345. { _id: station._id },
  346. { $set: { playlist: playlist } },
  347. { runValidators: true },
  348. (err) => {
  349. this.runJob("UPDATE_STATION", {
  350. stationId: station._id,
  351. })
  352. .then(() => next(null, playlist))
  353. .catch(next);
  354. }
  355. );
  356. },
  357. ],
  358. (err, newPlaylist) => {
  359. if (err) return reject(new Error(err));
  360. resolve(newPlaylist);
  361. }
  362. );
  363. });
  364. }
  365. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  366. GET_STATION(payload) {
  367. //stationId, cb, bypassValidate = false
  368. return new Promise((resolve, reject) => {
  369. async.waterfall(
  370. [
  371. (next) => {
  372. this.cache
  373. .runJob("HGET", {
  374. table: "stations",
  375. key: payload.stationId,
  376. })
  377. .then((station) => next(null, station))
  378. .catch(next);
  379. },
  380. (station, next) => {
  381. if (station) return next(true, station);
  382. this.stationModel.findOne(
  383. { _id: payload.stationId },
  384. next
  385. );
  386. },
  387. (station, next) => {
  388. if (station) {
  389. if (station.type === "official") {
  390. this.runJob(
  391. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  392. {
  393. stationId: station._id,
  394. songList: station.playlist,
  395. }
  396. )
  397. .then()
  398. .catch();
  399. }
  400. station = this.stationSchema(station);
  401. this.cache
  402. .runJob("HSET", {
  403. table: "stations",
  404. key: payload.stationId,
  405. value: station,
  406. })
  407. .then()
  408. .catch();
  409. next(true, station);
  410. } else next("Station not found");
  411. },
  412. ],
  413. async (err, station) => {
  414. if (err && err !== true) {
  415. err = await this.utils.runJob("GET_ERROR", {
  416. error: err,
  417. });
  418. reject(new Error(err));
  419. } else resolve(station);
  420. }
  421. );
  422. });
  423. }
  424. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  425. GET_STATION_BY_NAME(payload) {
  426. //stationName, cb
  427. return new Promise(async (resolve, reject) => {
  428. const stationModel = await this.db.runJob("GET_MODEL", {
  429. modelName: "station",
  430. });
  431. async.waterfall(
  432. [
  433. (next) => {
  434. stationModel.findOne(
  435. { name: payload.stationName },
  436. next
  437. );
  438. },
  439. (station, next) => {
  440. if (station) {
  441. if (station.type === "official") {
  442. this.runJob(
  443. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  444. {
  445. stationId: station._id,
  446. songList: station.playlist,
  447. }
  448. );
  449. }
  450. this.cache
  451. .runJob("GET_SCHEMA", { schemaName: "station" })
  452. .then((stationSchema) => {
  453. station = stationSchema(station);
  454. this.cache.runJob("HSET", {
  455. table: "stations",
  456. key: station._id,
  457. value: station,
  458. });
  459. next(true, station);
  460. });
  461. } else next("Station not found");
  462. },
  463. ],
  464. (err, station) => {
  465. if (err && err !== true) return reject(new Error(err));
  466. resolve(station);
  467. }
  468. );
  469. });
  470. }
  471. UPDATE_STATION(payload) {
  472. //stationId, cb, bypassValidate = false
  473. return new Promise((resolve, reject) => {
  474. async.waterfall(
  475. [
  476. (next) => {
  477. this.stationModel.findOne(
  478. { _id: payload.stationId },
  479. next
  480. );
  481. },
  482. (station, next) => {
  483. if (!station) {
  484. this.cache
  485. .runJob("HDEL", {
  486. table: "stations",
  487. key: payload.stationId,
  488. })
  489. .then()
  490. .catch();
  491. return next("Station not found");
  492. }
  493. this.cache
  494. .runJob("HSET", {
  495. table: "stations",
  496. key: payload.stationId,
  497. value: station,
  498. })
  499. .then((station) => next(null, station))
  500. .catch(next);
  501. },
  502. ],
  503. async (err, station) => {
  504. if (err && err !== true) {
  505. err = await this.utils.runJob("GET_ERROR", {
  506. error: err,
  507. });
  508. reject(new Error(err));
  509. } else resolve(station);
  510. }
  511. );
  512. });
  513. }
  514. CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  515. //stationId, songList, cb, bypassValidate = false
  516. return new Promise(async (resolve, reject) => {
  517. const officialPlaylistSchema = await this.cache.runJob(
  518. "GET_SCHEMA",
  519. {
  520. schemaName: "officialPlaylist",
  521. }
  522. );
  523. let lessInfoPlaylist = [];
  524. async.each(
  525. payload.songList,
  526. (song, next) => {
  527. this.songs
  528. .runJob("GET_SONG", { id: song })
  529. .then((response) => {
  530. const song = response.song;
  531. if (song) {
  532. let newSong = {
  533. songId: song.songId,
  534. title: song.title,
  535. artists: song.artists,
  536. duration: song.duration,
  537. };
  538. lessInfoPlaylist.push(newSong);
  539. }
  540. })
  541. .finally(() => {
  542. next();
  543. });
  544. },
  545. () => {
  546. this.cache
  547. .runJob("HSET", {
  548. table: "officialPlaylists",
  549. key: payload.stationId,
  550. value: officialPlaylistSchema(
  551. payload.stationId,
  552. lessInfoPlaylist
  553. ),
  554. })
  555. .finally(() => {
  556. this.cache.runJob("PUB", {
  557. channel: "station.newOfficialPlaylist",
  558. value: payload.stationId,
  559. });
  560. resolve();
  561. });
  562. }
  563. );
  564. });
  565. }
  566. SKIP_STATION(payload) {
  567. //stationId
  568. return new Promise((resolve, reject) => {
  569. this.log("INFO", `Skipping station ${payload.stationId}.`);
  570. this.log(
  571. "STATION_ISSUE",
  572. `SKIP_STATION_CB - Station ID: ${payload.stationId}.`
  573. );
  574. async.waterfall(
  575. [
  576. (next) => {
  577. this.runJob(
  578. "GET_STATION",
  579. {
  580. stationId: payload.stationId,
  581. },
  582. { bypassQueue: payload.bypassQueue }
  583. )
  584. .then((station) => {
  585. next(null, station);
  586. })
  587. .catch(() => {});
  588. },
  589. (station, next) => {
  590. if (!station) return next("Station not found.");
  591. if (
  592. station.type === "community" &&
  593. station.partyMode &&
  594. station.queue.length === 0
  595. )
  596. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  597. if (
  598. station.type === "community" &&
  599. station.partyMode &&
  600. station.queue.length > 0
  601. ) {
  602. // Community station with party mode enabled and songs in the queue
  603. if (station.paused) {
  604. return next(null, null, -19, station);
  605. } else {
  606. return this.stationModel.updateOne(
  607. { _id: payload.stationId },
  608. {
  609. $pull: {
  610. queue: {
  611. _id: station.queue[0]._id,
  612. },
  613. },
  614. },
  615. (err) => {
  616. if (err) return next(err);
  617. next(
  618. null,
  619. station.queue[0],
  620. -12,
  621. station
  622. );
  623. }
  624. );
  625. }
  626. }
  627. if (
  628. station.type === "community" &&
  629. !station.partyMode
  630. ) {
  631. this.db
  632. .runJob("GET_MODEL", { modelName: "playlist" })
  633. .then((playlistModel) => {
  634. return playlistModel.findOne(
  635. { _id: station.privatePlaylist },
  636. (err, playlist) => {
  637. if (err) return next(err);
  638. if (!playlist)
  639. return next(
  640. null,
  641. null,
  642. -13,
  643. station
  644. );
  645. playlist = playlist.songs;
  646. if (playlist.length > 0) {
  647. let currentSongIndex;
  648. if (
  649. station.currentSongIndex <
  650. playlist.length - 1
  651. )
  652. currentSongIndex =
  653. station.currentSongIndex +
  654. 1;
  655. else currentSongIndex = 0;
  656. let callback = (err, song) => {
  657. if (err) return next(err);
  658. if (song)
  659. return next(
  660. null,
  661. song,
  662. currentSongIndex,
  663. station
  664. );
  665. else {
  666. let song =
  667. playlist[
  668. currentSongIndex
  669. ];
  670. let currentSong = {
  671. songId: song.songId,
  672. title: song.title,
  673. duration:
  674. song.duration,
  675. likes: -1,
  676. dislikes: -1,
  677. };
  678. return next(
  679. null,
  680. currentSong,
  681. currentSongIndex,
  682. station
  683. );
  684. }
  685. };
  686. if (
  687. playlist[currentSongIndex]
  688. ._id
  689. )
  690. this.songs
  691. .runJob("GET_SONG", {
  692. id:
  693. playlist[
  694. currentSongIndex
  695. ]._id,
  696. })
  697. .then((response) =>
  698. callback(
  699. null,
  700. response.song
  701. )
  702. )
  703. .catch(callback);
  704. else
  705. this.songs
  706. .runJob(
  707. "GET_SONG_FROM_ID",
  708. {
  709. songId:
  710. playlist[
  711. currentSongIndex
  712. ].songId,
  713. }
  714. )
  715. .then((response) =>
  716. callback(
  717. null,
  718. response.song
  719. )
  720. )
  721. .catch(callback);
  722. } else
  723. return next(
  724. null,
  725. null,
  726. -14,
  727. station
  728. );
  729. }
  730. );
  731. });
  732. }
  733. if (
  734. station.type === "official" &&
  735. station.playlist.length === 0
  736. ) {
  737. return this.runJob(
  738. "CALCULATE_SONG_FOR_STATION",
  739. { station, bypassQueue: payload.bypassQueue },
  740. { bypassQueue: payload.bypassQueue }
  741. )
  742. .then((playlist) => {
  743. if (playlist.length === 0)
  744. return next(
  745. null,
  746. this.defaultSong,
  747. 0,
  748. station
  749. );
  750. else {
  751. this.songs
  752. .runJob("GET_SONG", {
  753. id: playlist[0],
  754. })
  755. .then((response) => {
  756. next(
  757. null,
  758. response.song,
  759. 0,
  760. station
  761. );
  762. })
  763. .catch((err) => {
  764. return next(
  765. null,
  766. this.defaultSong,
  767. 0,
  768. station
  769. );
  770. });
  771. }
  772. })
  773. .catch(next);
  774. }
  775. if (
  776. station.type === "official" &&
  777. station.playlist.length > 0
  778. ) {
  779. async.doUntil(
  780. (next) => {
  781. if (
  782. station.currentSongIndex <
  783. station.playlist.length - 1
  784. ) {
  785. this.songs
  786. .runJob("GET_SONG", {
  787. id:
  788. station.playlist[
  789. station.currentSongIndex +
  790. 1
  791. ],
  792. })
  793. .then((response) => {
  794. return next(
  795. null,
  796. response.song,
  797. station.currentSongIndex + 1
  798. );
  799. })
  800. .catch((err) => {
  801. station.currentSongIndex++;
  802. next(null, null, null);
  803. });
  804. } else {
  805. this.runJob(
  806. "CALCULATE_SONG_FOR_STATION",
  807. {
  808. station,
  809. bypassQueue:
  810. payload.bypassQueue,
  811. }
  812. )
  813. .then((newPlaylist) => {
  814. this.songs.getSong(
  815. newPlaylist[0],
  816. (err, song) => {
  817. if (err || !song)
  818. return next(
  819. null,
  820. this
  821. .defaultSong,
  822. 0
  823. );
  824. station.playlist = newPlaylist;
  825. next(null, song, 0);
  826. }
  827. );
  828. })
  829. .catch((err) => {
  830. next(null, this.defaultSong, 0);
  831. });
  832. }
  833. },
  834. (song, currentSongIndex, next) => {
  835. if (!!song)
  836. return next(
  837. null,
  838. true,
  839. currentSongIndex
  840. );
  841. else return next(null, false);
  842. },
  843. (err, song, currentSongIndex) => {
  844. return next(
  845. err,
  846. song,
  847. currentSongIndex,
  848. station
  849. );
  850. }
  851. );
  852. }
  853. },
  854. (song, currentSongIndex, station, next) => {
  855. let $set = {};
  856. if (song === null) $set.currentSong = null;
  857. else if (song.likes === -1 && song.dislikes === -1) {
  858. $set.currentSong = {
  859. songId: song.songId,
  860. title: song.title,
  861. duration: song.duration,
  862. skipDuration: 0,
  863. likes: -1,
  864. dislikes: -1,
  865. };
  866. } else {
  867. $set.currentSong = {
  868. songId: song.songId,
  869. title: song.title,
  870. artists: song.artists,
  871. duration: song.duration,
  872. likes: song.likes,
  873. dislikes: song.dislikes,
  874. skipDuration: song.skipDuration,
  875. thumbnail: song.thumbnail,
  876. };
  877. }
  878. if (currentSongIndex >= 0)
  879. $set.currentSongIndex = currentSongIndex;
  880. $set.startedAt = Date.now();
  881. $set.timePaused = 0;
  882. if (station.paused) $set.pausedAt = Date.now();
  883. next(null, $set, station);
  884. },
  885. ($set, station, next) => {
  886. this.stationModel.updateOne(
  887. { _id: station._id },
  888. { $set },
  889. (err) => {
  890. this.runJob("UPDATE_STATION", {
  891. stationId: station._id,
  892. bypassQueue: payload.bypassQueue,
  893. })
  894. .then((station) => {
  895. if (
  896. station.type === "community" &&
  897. station.partyMode === true
  898. )
  899. this.cache
  900. .runJob("PUB", {
  901. channel:
  902. "station.queueUpdate",
  903. value: payload.stationId,
  904. })
  905. .then()
  906. .catch();
  907. next(null, station);
  908. })
  909. .catch(next);
  910. }
  911. );
  912. },
  913. ],
  914. async (err, station) => {
  915. if (err) {
  916. err = await this.utils.runJob("GET_ERROR", {
  917. error: err,
  918. });
  919. this.log(
  920. "ERROR",
  921. `Skipping station "${payload.stationId}" failed. "${err}"`
  922. );
  923. reject(new Error(err));
  924. } else {
  925. if (
  926. station.currentSong !== null &&
  927. station.currentSong.songId !== undefined
  928. ) {
  929. station.currentSong.skipVotes = 0;
  930. }
  931. //TODO Pub/Sub this
  932. this.utils
  933. .runJob("EMIT_TO_ROOM", {
  934. room: `station.${station._id}`,
  935. args: [
  936. "event:songs.next",
  937. {
  938. currentSong: station.currentSong,
  939. startedAt: station.startedAt,
  940. paused: station.paused,
  941. timePaused: 0,
  942. },
  943. ],
  944. })
  945. .then()
  946. .catch();
  947. if (station.privacy === "public") {
  948. this.utils
  949. .runJob("EMIT_TO_ROOM", {
  950. room: "home",
  951. args: [
  952. "event:station.nextSong",
  953. station._id,
  954. station.currentSong,
  955. ],
  956. })
  957. .then()
  958. .catch();
  959. } else {
  960. let sockets = await this.utils.runJob(
  961. "GET_ROOM_SOCKETS",
  962. { room: "home" }
  963. );
  964. for (let socketId in sockets) {
  965. let socket = sockets[socketId];
  966. let session = sockets[socketId].session;
  967. if (session.sessionId) {
  968. this.cache
  969. .runJob("HGET", {
  970. table: "sessions",
  971. key: session.sessionId,
  972. })
  973. .then((session) => {
  974. if (session) {
  975. this.db
  976. .runJob("GET_MODEL", {
  977. modelName: "user",
  978. })
  979. .then((userModel) => {
  980. userModel.findOne(
  981. {
  982. _id:
  983. session.userId,
  984. },
  985. (err, user) => {
  986. if (
  987. !err &&
  988. user
  989. ) {
  990. if (
  991. user.role ===
  992. "admin"
  993. )
  994. socket.emit(
  995. "event:station.nextSong",
  996. station._id,
  997. station.currentSong
  998. );
  999. else if (
  1000. station.type ===
  1001. "community" &&
  1002. station.owner ===
  1003. session.userId
  1004. )
  1005. socket.emit(
  1006. "event:station.nextSong",
  1007. station._id,
  1008. station.currentSong
  1009. );
  1010. }
  1011. }
  1012. );
  1013. });
  1014. }
  1015. });
  1016. }
  1017. }
  1018. }
  1019. if (
  1020. station.currentSong !== null &&
  1021. station.currentSong.songId !== undefined
  1022. ) {
  1023. this.utils.runJob("SOCKETS_JOIN_SONG_ROOM", {
  1024. sockets: await this.utils.runJob(
  1025. "GET_ROOM_SOCKETS",
  1026. { room: `station.${station._id}` }
  1027. ),
  1028. room: `song.${station.currentSong.songId}`,
  1029. });
  1030. if (!station.paused) {
  1031. this.notifications.runJob("SCHEDULE", {
  1032. name: `stations.nextSong?id=${station._id}`,
  1033. time: station.currentSong.duration * 1000,
  1034. station,
  1035. });
  1036. }
  1037. } else {
  1038. this.utils
  1039. .runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  1040. sockets: await this.utils.runJob(
  1041. "GET_ROOM_SOCKETS",
  1042. { room: `station.${station._id}` }
  1043. ),
  1044. })
  1045. .then()
  1046. .catch();
  1047. }
  1048. resolve({ station: station });
  1049. }
  1050. }
  1051. );
  1052. });
  1053. }
  1054. CAN_USER_VIEW_STATION(payload) {
  1055. // station, userId, cb
  1056. return new Promise((resolve, reject) => {
  1057. async.waterfall(
  1058. [
  1059. (next) => {
  1060. if (payload.station.privacy !== "private")
  1061. return next(true);
  1062. if (!payload.userId) return next("Not allowed");
  1063. next();
  1064. },
  1065. (next) => {
  1066. this.db
  1067. .runJob("GET_MODEL", {
  1068. modelName: "user",
  1069. })
  1070. .then((userModel) => {
  1071. userModel.findOne(
  1072. { _id: payload.userId },
  1073. next
  1074. );
  1075. });
  1076. },
  1077. (user, next) => {
  1078. if (!user) return next("Not allowed");
  1079. if (user.role === "admin") return next(true);
  1080. if (payload.station.type === "official")
  1081. return next("Not allowed");
  1082. if (payload.station.owner === payload.userId)
  1083. return next(true);
  1084. next("Not allowed");
  1085. },
  1086. ],
  1087. async (errOrResult) => {
  1088. if (errOrResult !== true && errOrResult !== "Not allowed") {
  1089. errOrResult = await this.utils.runJob("GET_ERROR", {
  1090. error: errOrResult,
  1091. });
  1092. reject(new Error(errOrResult));
  1093. } else {
  1094. resolve(errOrResult === true ? true : false);
  1095. }
  1096. }
  1097. );
  1098. });
  1099. }
  1100. }
  1101. module.exports = new StationsModule();