stations.js 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972
  1. import async from "async";
  2. import CoreClass from "../core";
  3. class StationsModule extends CoreClass {
  4. constructor() {
  5. super("stations");
  6. }
  7. async initialize() {
  8. this.cache = this.moduleManager.modules.cache;
  9. this.db = this.moduleManager.modules.db;
  10. this.utils = this.moduleManager.modules.utils;
  11. this.songs = this.moduleManager.modules.songs;
  12. this.notifications = this.moduleManager.modules.notifications;
  13. this.defaultSong = {
  14. songId: "60ItHLz5WEA",
  15. title: "Faded - Alan Walker",
  16. duration: 212,
  17. skipDuration: 0,
  18. likes: -1,
  19. dislikes: -1
  20. };
  21. // TEMP
  22. this.cache.runJob("SUB", {
  23. channel: "station.pause",
  24. cb: async stationId => {
  25. this.notifications
  26. .runJob("REMOVE", {
  27. subscription: `stations.nextSong?id=${stationId}`
  28. })
  29. .then();
  30. }
  31. });
  32. this.cache.runJob("SUB", {
  33. channel: "station.resume",
  34. cb: async stationId => {
  35. this.runJob("INITIALIZE_STATION", { stationId }).then();
  36. }
  37. });
  38. this.cache.runJob("SUB", {
  39. channel: "station.queueUpdate",
  40. cb: async stationId => {
  41. this.runJob("GET_STATION", { stationId }).then(station => {
  42. if (!station.currentSong && station.queue.length > 0) {
  43. this.runJob("INITIALIZE_STATION", {
  44. stationId
  45. }).then();
  46. }
  47. });
  48. }
  49. });
  50. this.cache.runJob("SUB", {
  51. channel: "station.newOfficialPlaylist",
  52. cb: async stationId => {
  53. this.cache
  54. .runJob("HGET", {
  55. table: "officialPlaylists",
  56. key: stationId
  57. })
  58. .then(playlistObj => {
  59. if (playlistObj) {
  60. this.utils.runJob("EMIT_TO_ROOM", {
  61. room: `station.${stationId}`,
  62. args: ["event:newOfficialPlaylist", playlistObj.songs]
  63. });
  64. }
  65. });
  66. }
  67. });
  68. const stationModel = (this.stationModel = await this.db.runJob("GET_MODEL", { modelName: "station" }));
  69. const stationSchema = (this.stationSchema = await this.cache.runJob("GET_SCHEMA", { schemaName: "station" }));
  70. return new Promise((resolve, reject) =>
  71. async.waterfall(
  72. [
  73. next => {
  74. this.setStage(2);
  75. this.cache
  76. .runJob("HGETALL", { table: "stations" })
  77. .then(stations => {
  78. next(null, stations);
  79. })
  80. .catch(next);
  81. },
  82. (stations, next) => {
  83. this.setStage(3);
  84. if (!stations) return next();
  85. const stationIds = Object.keys(stations);
  86. return async.each(
  87. stationIds,
  88. (stationId, next) => {
  89. stationModel.findOne({ _id: stationId }, (err, station) => {
  90. if (err) next(err);
  91. else if (!station) {
  92. this.cache
  93. .runJob("HDEL", {
  94. table: "stations",
  95. key: stationId
  96. })
  97. .then(() => {
  98. next();
  99. })
  100. .catch(next);
  101. } else next();
  102. });
  103. },
  104. next
  105. );
  106. },
  107. next => {
  108. this.setStage(4);
  109. stationModel.find({}, next);
  110. },
  111. (stations, next) => {
  112. this.setStage(5);
  113. async.each(
  114. stations,
  115. (station, next2) => {
  116. async.waterfall(
  117. [
  118. next => {
  119. this.cache
  120. .runJob("HSET", {
  121. table: "stations",
  122. key: station._id,
  123. value: stationSchema(station)
  124. })
  125. .then(station => next(null, station))
  126. .catch(next);
  127. },
  128. (station, next) => {
  129. this.runJob(
  130. "INITIALIZE_STATION",
  131. {
  132. stationId: station._id,
  133. bypassQueue: true
  134. },
  135. { bypassQueue: true }
  136. )
  137. .then(() => {
  138. next();
  139. })
  140. .catch(next); // bypassQueue is true because otherwise the module will never initialize
  141. }
  142. ],
  143. err => {
  144. next2(err);
  145. }
  146. );
  147. },
  148. next
  149. );
  150. }
  151. ],
  152. async err => {
  153. if (err) {
  154. err = await this.utils.runJob("GET_ERROR", {
  155. error: err
  156. });
  157. reject(new Error(err));
  158. } else {
  159. resolve();
  160. }
  161. }
  162. )
  163. );
  164. }
  165. INITIALIZE_STATION(payload) {
  166. // stationId, cb, bypassValidate = false
  167. return new Promise((resolve, reject) => {
  168. // if (typeof cb !== 'function') cb = ()=>{};
  169. async.waterfall(
  170. [
  171. next => {
  172. this.runJob(
  173. "GET_STATION",
  174. {
  175. stationId: payload.stationId,
  176. bypassQueue: payload.bypassQueue
  177. },
  178. { bypassQueue: payload.bypassQueue }
  179. )
  180. .then(station => {
  181. next(null, station);
  182. })
  183. .catch(next);
  184. },
  185. (station, next) => {
  186. if (!station) return next("Station not found.");
  187. this.notifications
  188. .runJob("UNSCHEDULE", {
  189. name: `stations.nextSong?id=${station._id}`
  190. })
  191. .then()
  192. .catch();
  193. this.notifications
  194. .runJob("SUBSCRIBE", {
  195. name: `stations.nextSong?id=${station._id}`,
  196. cb: () =>
  197. this.runJob("SKIP_STATION", {
  198. stationId: station._id
  199. }),
  200. unique: true,
  201. station
  202. })
  203. .then()
  204. .catch();
  205. if (station.paused) return next(true, station);
  206. return next(null, station);
  207. },
  208. (station, next) => {
  209. if (!station.currentSong) {
  210. return this.runJob(
  211. "SKIP_STATION",
  212. {
  213. stationId: station._id,
  214. bypassQueue: payload.bypassQueue
  215. },
  216. { bypassQueue: payload.bypassQueue }
  217. )
  218. .then(station => {
  219. next(true, station);
  220. })
  221. .catch(next)
  222. .finally(() => {});
  223. }
  224. let timeLeft =
  225. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  226. if (Number.isNaN(timeLeft)) timeLeft = -1;
  227. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  228. return this.runJob(
  229. "SKIP_STATION",
  230. {
  231. stationId: station._id,
  232. bypassQueue: payload.bypassQueue
  233. },
  234. { bypassQueue: payload.bypassQueue }
  235. )
  236. .then(station => {
  237. next(null, station);
  238. })
  239. .catch(next);
  240. }
  241. // name, time, cb, station
  242. this.notifications.runJob("SCHEDULE", {
  243. name: `stations.nextSong?id=${station._id}`,
  244. time: timeLeft,
  245. station
  246. });
  247. return next(null, station);
  248. }
  249. ],
  250. async (err, station) => {
  251. if (err && err !== true) {
  252. err = await this.utils.runJob("GET_ERROR", {
  253. error: err
  254. });
  255. reject(new Error(err));
  256. } else resolve(station);
  257. }
  258. );
  259. });
  260. }
  261. async CALCULATE_SONG_FOR_STATION(payload) {
  262. // station, bypassValidate = false
  263. const stationModel = await this.db.runJob("GET_MODEL", { modelName: "station" });
  264. const songModel = await this.db.runJob("GET_MODEL", { modelName: "song" });
  265. return new Promise((resolve, reject) => {
  266. const songList = [];
  267. return async.waterfall(
  268. [
  269. next => {
  270. if (payload.station.genres.length === 0) return next();
  271. const genresDone = [];
  272. return payload.station.genres.forEach(genre => {
  273. songModel.find({ genres: genre }, (err, songs) => {
  274. if (!err) {
  275. songs.forEach(song => {
  276. if (songList.indexOf(song._id) === -1) {
  277. let found = false;
  278. song.genres.forEach(songGenre => {
  279. if (payload.station.blacklistedGenres.indexOf(songGenre) !== -1)
  280. found = true;
  281. });
  282. if (!found) {
  283. songList.push(song._id);
  284. }
  285. }
  286. });
  287. }
  288. genresDone.push(genre);
  289. if (genresDone.length === payload.station.genres.length) next();
  290. });
  291. });
  292. },
  293. next => {
  294. const playlist = [];
  295. songList.forEach(songId => {
  296. if (payload.station.playlist.indexOf(songId) === -1) playlist.push(songId);
  297. });
  298. // eslint-disable-next-line array-callback-return
  299. payload.station.playlist.filter(songId => {
  300. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  301. });
  302. this.utils
  303. .runJob("SHUFFLE", { array: playlist })
  304. .then(result => {
  305. next(null, result.array);
  306. })
  307. .catch(next);
  308. },
  309. (playlist, next) => {
  310. this.runJob(
  311. "CALCULATE_OFFICIAL_PLAYLIST_LIST",
  312. {
  313. stationId: payload.station._id,
  314. songList: playlist,
  315. bypassQueue: payload.bypassQueue
  316. },
  317. { bypassQueue: payload.bypassQueue }
  318. )
  319. .then(() => {
  320. next(null, playlist);
  321. })
  322. .catch(next);
  323. },
  324. (playlist, next) => {
  325. stationModel.updateOne(
  326. { _id: payload.station._id },
  327. { $set: { playlist } },
  328. { runValidators: true },
  329. () => {
  330. this.runJob(
  331. "UPDATE_STATION",
  332. {
  333. stationId: payload.station._id,
  334. bypassQueue: payload.bypassQueue
  335. },
  336. { bypassQueue: payload.bypassQueue }
  337. )
  338. .then(() => {
  339. next(null, playlist);
  340. })
  341. .catch(next);
  342. }
  343. );
  344. }
  345. ],
  346. (err, newPlaylist) => {
  347. if (err) return reject(new Error(err));
  348. return resolve(newPlaylist);
  349. }
  350. );
  351. });
  352. }
  353. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  354. GET_STATION(payload) {
  355. // stationId, cb, bypassValidate = false
  356. return new Promise((resolve, reject) => {
  357. async.waterfall(
  358. [
  359. next => {
  360. this.cache
  361. .runJob("HGET", {
  362. table: "stations",
  363. key: payload.stationId
  364. })
  365. .then(station => {
  366. next(null, station);
  367. })
  368. .catch(next);
  369. },
  370. (station, next) => {
  371. if (station) return next(true, station);
  372. return this.stationModel.findOne({ _id: payload.stationId }, next);
  373. },
  374. (station, next) => {
  375. if (station) {
  376. if (station.type === "official") {
  377. this.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  378. stationId: station._id,
  379. songList: station.playlist
  380. })
  381. .then()
  382. .catch();
  383. }
  384. station = this.stationSchema(station);
  385. this.cache
  386. .runJob("HSET", {
  387. table: "stations",
  388. key: payload.stationId,
  389. value: station
  390. })
  391. .then()
  392. .catch();
  393. next(true, station);
  394. } else next("Station not found");
  395. }
  396. ],
  397. async (err, station) => {
  398. if (err && err !== true) {
  399. err = await this.utils.runJob("GET_ERROR", {
  400. error: err
  401. });
  402. reject(new Error(err));
  403. } else resolve(station);
  404. }
  405. );
  406. });
  407. }
  408. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  409. async GET_STATION_BY_NAME(payload) {
  410. // stationName
  411. const stationModel = await this.db.runJob("GET_MODEL", { modelName: "station" });
  412. return new Promise((resolve, reject) =>
  413. async.waterfall(
  414. [
  415. next => {
  416. stationModel.findOne({ name: payload.stationName }, next);
  417. },
  418. (station, next) => {
  419. if (station) {
  420. if (station.type === "official") {
  421. this.runJob("CALCULATE_OFFICIAL_PLAYLIST_LIST", {
  422. stationId: station._id,
  423. songList: station.playlist
  424. });
  425. }
  426. this.cache.runJob("GET_SCHEMA", { schemaName: "station" }).then(stationSchema => {
  427. station = stationSchema(station);
  428. this.cache.runJob("HSET", {
  429. table: "stations",
  430. key: station._id,
  431. value: station
  432. });
  433. next(true, station);
  434. });
  435. } else next("Station not found");
  436. }
  437. ],
  438. (err, station) => {
  439. if (err && err !== true) return reject(new Error(err));
  440. return resolve(station);
  441. }
  442. )
  443. );
  444. }
  445. UPDATE_STATION(payload) {
  446. // stationId, cb, bypassValidate = false
  447. return new Promise((resolve, reject) => {
  448. async.waterfall(
  449. [
  450. next => {
  451. this.stationModel.findOne({ _id: payload.stationId }, next);
  452. },
  453. (station, next) => {
  454. if (!station) {
  455. this.cache
  456. .runJob("HDEL", {
  457. table: "stations",
  458. key: payload.stationId
  459. })
  460. .then()
  461. .catch();
  462. return next("Station not found");
  463. }
  464. return this.cache
  465. .runJob("HSET", {
  466. table: "stations",
  467. key: payload.stationId,
  468. value: station
  469. })
  470. .then(station => {
  471. next(null, station);
  472. })
  473. .catch(next);
  474. }
  475. ],
  476. async (err, station) => {
  477. if (err && err !== true) {
  478. err = await this.utils.runJob("GET_ERROR", {
  479. error: err
  480. });
  481. reject(new Error(err));
  482. } else resolve(station);
  483. }
  484. );
  485. });
  486. }
  487. async CALCULATE_OFFICIAL_PLAYLIST_LIST(payload) {
  488. // stationId, songList, cb, bypassValidate = false
  489. const officialPlaylistSchema = await this.cache.runJob("GET_SCHEMA", { schemaName: "officialPlaylist" });
  490. return new Promise(resolve => {
  491. const lessInfoPlaylist = [];
  492. return async.each(
  493. payload.songList,
  494. (song, next) => {
  495. this.songs
  496. .runJob("GET_SONG", { id: song })
  497. .then(response => {
  498. const { song } = response;
  499. if (song) {
  500. const newSong = {
  501. songId: song.songId,
  502. title: song.title,
  503. artists: song.artists,
  504. duration: song.duration
  505. };
  506. lessInfoPlaylist.push(newSong);
  507. }
  508. })
  509. .finally(() => {
  510. next();
  511. });
  512. },
  513. () => {
  514. this.cache
  515. .runJob("HSET", {
  516. table: "officialPlaylists",
  517. key: payload.stationId,
  518. value: officialPlaylistSchema(payload.stationId, lessInfoPlaylist)
  519. })
  520. .finally(() => {
  521. this.cache.runJob("PUB", {
  522. channel: "station.newOfficialPlaylist",
  523. value: payload.stationId
  524. });
  525. resolve();
  526. });
  527. }
  528. );
  529. });
  530. }
  531. SKIP_STATION(payload) {
  532. // stationId
  533. return new Promise((resolve, reject) => {
  534. this.log("INFO", `Skipping station ${payload.stationId}.`);
  535. this.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  536. async.waterfall(
  537. [
  538. next => {
  539. this.runJob(
  540. "GET_STATION",
  541. {
  542. stationId: payload.stationId,
  543. bypassQueue: payload.bypassQueue
  544. },
  545. { bypassQueue: payload.bypassQueue }
  546. )
  547. .then(station => {
  548. next(null, station);
  549. })
  550. .catch(() => {});
  551. },
  552. // eslint-disable-next-line consistent-return
  553. (station, next) => {
  554. if (!station) return next("Station not found.");
  555. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  556. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  557. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  558. // Community station with party mode enabled and songs in the queue
  559. if (station.paused) return next(null, null, -19, station);
  560. return this.stationModel.updateOne(
  561. { _id: payload.stationId },
  562. {
  563. $pull: {
  564. queue: {
  565. _id: station.queue[0]._id
  566. }
  567. }
  568. },
  569. err => {
  570. if (err) return next(err);
  571. return next(null, station.queue[0], -12, station);
  572. }
  573. );
  574. }
  575. if (station.type === "community" && !station.partyMode) {
  576. return this.db.runJob("GET_MODEL", { modelName: "playlist" }).then(playlistModel =>
  577. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  578. if (err) return next(err);
  579. if (!playlist) return next(null, null, -13, station);
  580. playlist = playlist.songs;
  581. if (playlist.length > 0) {
  582. let currentSongIndex;
  583. if (station.currentSongIndex < playlist.length - 1)
  584. currentSongIndex = station.currentSongIndex + 1;
  585. else currentSongIndex = 0;
  586. const callback = (err, song) => {
  587. if (err) return next(err);
  588. if (song) return next(null, song, currentSongIndex, station);
  589. const currentSong = {
  590. songId: playlist[currentSongIndex].songId,
  591. title: playlist[currentSongIndex].title,
  592. duration: playlist[currentSongIndex].duration,
  593. likes: -1,
  594. dislikes: -1
  595. };
  596. return next(null, currentSong, currentSongIndex, station);
  597. };
  598. if (playlist[currentSongIndex]._id)
  599. return this.songs
  600. .runJob("GET_SONG", {
  601. id: playlist[currentSongIndex]._id
  602. })
  603. .then(response => callback(null, response.song))
  604. .catch(callback);
  605. return this.songs
  606. .runJob("GET_SONG_FROM_ID", {
  607. songId: playlist[currentSongIndex].songId
  608. })
  609. .then(response => callback(null, response.song))
  610. .catch(callback);
  611. }
  612. return next(null, null, -14, station);
  613. })
  614. );
  615. }
  616. if (station.type === "official" && station.playlist.length === 0) {
  617. return this.runJob(
  618. "CALCULATE_SONG_FOR_STATION",
  619. { station, bypassQueue: payload.bypassQueue },
  620. { bypassQueue: payload.bypassQueue }
  621. )
  622. .then(playlist => {
  623. if (playlist.length === 0) return next(null, this.defaultSong, 0, station);
  624. return this.songs
  625. .runJob("GET_SONG", {
  626. id: playlist[0]
  627. })
  628. .then(response => {
  629. next(null, response.song, 0, station);
  630. })
  631. .catch(() => next(null, this.defaultSong, 0, station));
  632. })
  633. .catch(next);
  634. }
  635. if (station.type === "official" && station.playlist.length > 0) {
  636. return async.doUntil(
  637. next => {
  638. if (station.currentSongIndex < station.playlist.length - 1) {
  639. this.songs
  640. .runJob("GET_SONG", {
  641. id: station.playlist[station.currentSongIndex + 1]
  642. })
  643. .then(response => next(null, response.song, station.currentSongIndex + 1))
  644. .catch(() => {
  645. station.currentSongIndex += 1;
  646. next(null, null, null);
  647. });
  648. } else {
  649. this.runJob(
  650. "CALCULATE_SONG_FOR_STATION",
  651. {
  652. station,
  653. bypassQueue: payload.bypassQueue
  654. },
  655. { bypassQueue: payload.bypassQueue }
  656. )
  657. .then(newPlaylist => {
  658. this.songs
  659. .runJob("GET_SONG", { id: newPlaylist[0] })
  660. .then(response => {
  661. station.playlist = newPlaylist;
  662. next(null, response.song, 0);
  663. })
  664. .catch(() => next(null, this.defaultSong, 0));
  665. })
  666. .catch(() => {
  667. next(null, this.defaultSong, 0);
  668. });
  669. }
  670. },
  671. (song, currentSongIndex, next) => {
  672. if (song) return next(null, true, currentSongIndex);
  673. return next(null, false);
  674. },
  675. (err, song, currentSongIndex) => next(err, song, currentSongIndex, station)
  676. );
  677. }
  678. },
  679. (song, currentSongIndex, station, next) => {
  680. const $set = {};
  681. if (song === null) $set.currentSong = null;
  682. else if (song.likes === -1 && song.dislikes === -1) {
  683. $set.currentSong = {
  684. songId: song.songId,
  685. title: song.title,
  686. duration: song.duration,
  687. skipDuration: 0,
  688. likes: -1,
  689. dislikes: -1
  690. };
  691. } else {
  692. $set.currentSong = {
  693. songId: song.songId,
  694. title: song.title,
  695. artists: song.artists,
  696. duration: song.duration,
  697. likes: song.likes,
  698. dislikes: song.dislikes,
  699. skipDuration: song.skipDuration,
  700. thumbnail: song.thumbnail
  701. };
  702. }
  703. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  704. $set.startedAt = Date.now();
  705. $set.timePaused = 0;
  706. if (station.paused) $set.pausedAt = Date.now();
  707. next(null, $set, station);
  708. },
  709. ($set, station, next) => {
  710. this.stationModel.updateOne({ _id: station._id }, { $set }, () => {
  711. this.runJob(
  712. "UPDATE_STATION",
  713. {
  714. stationId: station._id,
  715. bypassQueue: payload.bypassQueue
  716. },
  717. { bypassQueue: payload.bypassQueue }
  718. )
  719. .then(station => {
  720. if (station.type === "community" && station.partyMode === true)
  721. this.cache
  722. .runJob("PUB", {
  723. channel: "station.queueUpdate",
  724. value: payload.stationId
  725. })
  726. .then()
  727. .catch();
  728. next(null, station);
  729. })
  730. .catch(next);
  731. });
  732. }
  733. ],
  734. async (err, station) => {
  735. if (err) {
  736. err = await this.utils.runJob("GET_ERROR", {
  737. error: err
  738. });
  739. this.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  740. reject(new Error(err));
  741. } else {
  742. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  743. station.currentSong.skipVotes = 0;
  744. }
  745. // TODO Pub/Sub this
  746. this.utils
  747. .runJob("EMIT_TO_ROOM", {
  748. room: `station.${station._id}`,
  749. args: [
  750. "event:songs.next",
  751. {
  752. currentSong: station.currentSong,
  753. startedAt: station.startedAt,
  754. paused: station.paused,
  755. timePaused: 0
  756. }
  757. ]
  758. })
  759. .then()
  760. .catch();
  761. if (station.privacy === "public") {
  762. this.utils
  763. .runJob("EMIT_TO_ROOM", {
  764. room: "home",
  765. args: ["event:station.nextSong", station._id, station.currentSong]
  766. })
  767. .then()
  768. .catch();
  769. } else {
  770. const sockets = await this.utils.runJob("GET_ROOM_SOCKETS", { room: "home" });
  771. for (
  772. let socketId = 0, socketKeys = Object.keys(sockets);
  773. socketId < socketKeys.length;
  774. socketId += 1
  775. ) {
  776. const socket = sockets[socketId];
  777. const { session } = sockets[socketId];
  778. if (session.sessionId) {
  779. this.cache
  780. .runJob("HGET", {
  781. table: "sessions",
  782. key: session.sessionId
  783. })
  784. .then(session => {
  785. if (session) {
  786. this.db.runJob("GET_MODEL", { modelName: "user" }).then(userModel => {
  787. userModel.findOne(
  788. {
  789. _id: session.userId
  790. },
  791. (err, user) => {
  792. if (!err && user) {
  793. if (user.role === "admin")
  794. socket.emit(
  795. "event:station.nextSong",
  796. station._id,
  797. station.currentSong
  798. );
  799. else if (
  800. station.type === "community" &&
  801. station.owner === session.userId
  802. )
  803. socket.emit(
  804. "event:station.nextSong",
  805. station._id,
  806. station.currentSong
  807. );
  808. }
  809. }
  810. );
  811. });
  812. }
  813. });
  814. }
  815. }
  816. }
  817. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  818. this.utils.runJob("SOCKETS_JOIN_SONG_ROOM", {
  819. sockets: await this.utils.runJob("GET_ROOM_SOCKETS", {
  820. room: `station.${station._id}`
  821. }),
  822. room: `song.${station.currentSong.songId}`
  823. });
  824. if (!station.paused) {
  825. this.notifications.runJob("SCHEDULE", {
  826. name: `stations.nextSong?id=${station._id}`,
  827. time: station.currentSong.duration * 1000,
  828. station
  829. });
  830. }
  831. } else {
  832. this.utils
  833. .runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  834. sockets: await this.utils.runJob("GET_ROOM_SOCKETS", {
  835. room: `station.${station._id}`
  836. })
  837. })
  838. .then()
  839. .catch();
  840. }
  841. resolve({ station });
  842. }
  843. }
  844. );
  845. });
  846. }
  847. CAN_USER_VIEW_STATION(payload) {
  848. // station, userId, hideUnlisted
  849. return new Promise((resolve, reject) => {
  850. async.waterfall(
  851. [
  852. next => {
  853. if (payload.station.privacy === "public") return next(true);
  854. if (payload.station.privacy === "unlisted")
  855. if (payload.hideUnlisted === true) return next();
  856. else return next(true);
  857. if (!payload.userId) return next("Not allowed");
  858. return next();
  859. },
  860. next => {
  861. this.db
  862. .runJob("GET_MODEL", {
  863. modelName: "user"
  864. })
  865. .then(userModel => {
  866. userModel.findOne({ _id: payload.userId }, next);
  867. });
  868. },
  869. (user, next) => {
  870. if (!user) return next("Not allowed");
  871. if (user.role === "admin") return next(true);
  872. if (payload.station.type === "official") return next("Not allowed");
  873. if (payload.station.owner === payload.userId) return next(true);
  874. return next("Not allowed");
  875. }
  876. ],
  877. async errOrResult => {
  878. if (errOrResult !== true && errOrResult !== "Not allowed") {
  879. errOrResult = await this.utils.runJob("GET_ERROR", {
  880. error: errOrResult
  881. });
  882. reject(new Error(errOrResult));
  883. } else {
  884. resolve(errOrResult === true);
  885. }
  886. }
  887. );
  888. });
  889. }
  890. }
  891. export default new StationsModule();