stations.js 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552
  1. import async from "async";
  2. import mongoose from "mongoose";
  3. import CoreClass from "../core";
  4. let StationsModule;
  5. let CacheModule;
  6. let DBModule;
  7. let UtilsModule;
  8. let IOModule;
  9. let SongsModule;
  10. let PlaylistsModule;
  11. let NotificationsModule;
  12. class _StationsModule extends CoreClass {
  13. // eslint-disable-next-line require-jsdoc
  14. constructor() {
  15. super("stations");
  16. StationsModule = this;
  17. }
  18. /**
  19. * Initialises the stations module
  20. *
  21. * @returns {Promise} - returns promise (reject, resolve)
  22. */
  23. async initialize() {
  24. CacheModule = this.moduleManager.modules.cache;
  25. DBModule = this.moduleManager.modules.db;
  26. UtilsModule = this.moduleManager.modules.utils;
  27. IOModule = this.moduleManager.modules.io;
  28. SongsModule = this.moduleManager.modules.songs;
  29. PlaylistsModule = this.moduleManager.modules.playlists;
  30. NotificationsModule = this.moduleManager.modules.notifications;
  31. this.defaultSong = {
  32. songId: "60ItHLz5WEA",
  33. title: "Faded - Alan Walker",
  34. duration: 212,
  35. skipDuration: 0,
  36. likes: -1,
  37. dislikes: -1,
  38. requestedAt: Date.now()
  39. };
  40. this.userList = {};
  41. this.usersPerStation = {};
  42. this.usersPerStationCount = {};
  43. // TEMP
  44. CacheModule.runJob("SUB", {
  45. channel: "station.pause",
  46. cb: async stationId => {
  47. NotificationsModule.runJob("REMOVE", {
  48. subscription: `stations.nextSong?id=${stationId}`
  49. }).then();
  50. }
  51. });
  52. CacheModule.runJob("SUB", {
  53. channel: "station.resume",
  54. cb: async stationId => {
  55. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  56. }
  57. });
  58. CacheModule.runJob("SUB", {
  59. channel: "station.queueUpdate",
  60. cb: async stationId => {
  61. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  62. if (!station.currentSong && station.queue.length > 0) {
  63. StationsModule.runJob("INITIALIZE_STATION", {
  64. stationId
  65. }).then();
  66. }
  67. });
  68. }
  69. });
  70. CacheModule.runJob("SUB", {
  71. channel: "station.newOfficialPlaylist",
  72. cb: async stationId => {
  73. CacheModule.runJob("HGET", {
  74. table: "officialPlaylists",
  75. key: stationId
  76. }).then(playlistObj => {
  77. if (playlistObj) {
  78. IOModule.runJob("EMIT_TO_ROOM", {
  79. room: `station.${stationId}`,
  80. args: ["event:newOfficialPlaylist", playlistObj.songs]
  81. });
  82. }
  83. });
  84. }
  85. });
  86. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  87. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  88. return new Promise((resolve, reject) =>
  89. async.waterfall(
  90. [
  91. next => {
  92. this.setStage(2);
  93. CacheModule.runJob("HGETALL", { table: "stations" })
  94. .then(stations => {
  95. next(null, stations);
  96. })
  97. .catch(next);
  98. },
  99. (stations, next) => {
  100. this.setStage(3);
  101. if (!stations) return next();
  102. const stationIds = Object.keys(stations);
  103. return async.each(
  104. stationIds,
  105. (stationId, next) => {
  106. stationModel.findOne({ _id: stationId }, (err, station) => {
  107. if (err) next(err);
  108. else if (!station) {
  109. CacheModule.runJob("HDEL", {
  110. table: "stations",
  111. key: stationId
  112. })
  113. .then(() => {
  114. next();
  115. })
  116. .catch(next);
  117. } else next();
  118. });
  119. },
  120. next
  121. );
  122. },
  123. next => {
  124. this.setStage(4);
  125. stationModel.find({}, next);
  126. },
  127. (stations, next) => {
  128. this.setStage(5);
  129. async.each(
  130. stations,
  131. (station, next2) => {
  132. async.waterfall(
  133. [
  134. next => {
  135. CacheModule.runJob("HSET", {
  136. table: "stations",
  137. key: station._id,
  138. value: stationSchema(station)
  139. })
  140. .then(station => next(null, station))
  141. .catch(next);
  142. },
  143. (station, next) => {
  144. StationsModule.runJob(
  145. "INITIALIZE_STATION",
  146. {
  147. stationId: station._id
  148. },
  149. null,
  150. -1
  151. )
  152. .then(() => {
  153. next();
  154. })
  155. .catch(next);
  156. }
  157. ],
  158. err => {
  159. next2(err);
  160. }
  161. );
  162. },
  163. next
  164. );
  165. }
  166. ],
  167. async err => {
  168. if (err) {
  169. err = await UtilsModule.runJob("GET_ERROR", {
  170. error: err
  171. });
  172. reject(new Error(err));
  173. } else {
  174. resolve();
  175. }
  176. }
  177. )
  178. );
  179. }
  180. /**
  181. * Initialises a station
  182. *
  183. * @param {object} payload - object that contains the payload
  184. * @param {string} payload.stationId - id of the station to initialise
  185. * @returns {Promise} - returns a promise (resolve, reject)
  186. */
  187. INITIALIZE_STATION(payload) {
  188. return new Promise((resolve, reject) => {
  189. // if (typeof cb !== 'function') cb = ()=>{};
  190. async.waterfall(
  191. [
  192. next => {
  193. StationsModule.runJob(
  194. "GET_STATION",
  195. {
  196. stationId: payload.stationId
  197. },
  198. this
  199. )
  200. .then(station => {
  201. next(null, station);
  202. })
  203. .catch(next);
  204. },
  205. (station, next) => {
  206. if (!station) return next("Station not found.");
  207. return NotificationsModule.runJob(
  208. "UNSCHEDULE",
  209. {
  210. name: `stations.nextSong?id=${station._id}`
  211. },
  212. this
  213. )
  214. .then()
  215. .catch()
  216. .finally(() => {
  217. NotificationsModule.runJob("SUBSCRIBE", {
  218. name: `stations.nextSong?id=${station._id}`,
  219. cb: () =>
  220. StationsModule.runJob("SKIP_STATION", {
  221. stationId: station._id
  222. }),
  223. unique: true,
  224. station
  225. })
  226. .then()
  227. .catch();
  228. if (station.paused) return next(true, station);
  229. return next(null, station);
  230. });
  231. },
  232. (station, next) => {
  233. if (!station.currentSong) {
  234. return StationsModule.runJob(
  235. "SKIP_STATION",
  236. {
  237. stationId: station._id
  238. },
  239. this
  240. )
  241. .then(station => {
  242. next(true, station);
  243. })
  244. .catch(next)
  245. .finally(() => {});
  246. }
  247. let timeLeft =
  248. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  249. if (Number.isNaN(timeLeft)) timeLeft = -1;
  250. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  251. return StationsModule.runJob(
  252. "SKIP_STATION",
  253. {
  254. stationId: station._id
  255. },
  256. this
  257. )
  258. .then(station => {
  259. next(null, station);
  260. })
  261. .catch(next);
  262. }
  263. // name, time, cb, station
  264. NotificationsModule.runJob("SCHEDULE", {
  265. name: `stations.nextSong?id=${station._id}`,
  266. time: timeLeft,
  267. station
  268. });
  269. return next(null, station);
  270. }
  271. ],
  272. async (err, station) => {
  273. if (err && err !== true) {
  274. err = await UtilsModule.runJob(
  275. "GET_ERROR",
  276. {
  277. error: err
  278. },
  279. this
  280. );
  281. reject(new Error(err));
  282. } else resolve(station);
  283. }
  284. );
  285. });
  286. }
  287. /**
  288. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  289. *
  290. * @param {object} payload - object that contains the payload
  291. * @param {string} payload.stationId - id of the station
  292. * @returns {Promise} - returns a promise (resolve, reject)
  293. */
  294. GET_STATION(payload) {
  295. return new Promise((resolve, reject) => {
  296. async.waterfall(
  297. [
  298. next => {
  299. CacheModule.runJob(
  300. "HGET",
  301. {
  302. table: "stations",
  303. key: payload.stationId
  304. },
  305. this
  306. )
  307. .then(station => {
  308. next(null, station);
  309. })
  310. .catch(next);
  311. },
  312. (station, next) => {
  313. if (station) return next(true, station);
  314. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  315. },
  316. (station, next) => {
  317. if (station) {
  318. station = StationsModule.stationSchema(station);
  319. CacheModule.runJob("HSET", {
  320. table: "stations",
  321. key: payload.stationId,
  322. value: station
  323. })
  324. .then()
  325. .catch();
  326. next(true, station);
  327. } else next("Station not found");
  328. }
  329. ],
  330. async (err, station) => {
  331. if (err && err !== true) {
  332. err = await UtilsModule.runJob(
  333. "GET_ERROR",
  334. {
  335. error: err
  336. },
  337. this
  338. );
  339. reject(new Error(err));
  340. } else resolve(station);
  341. }
  342. );
  343. });
  344. }
  345. /**
  346. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  347. *
  348. * @param {object} payload - object that contains the payload
  349. * @param {string} payload.stationName - the unique name of the station
  350. * @returns {Promise} - returns a promise (resolve, reject)
  351. */
  352. async GET_STATION_BY_NAME(payload) {
  353. return new Promise((resolve, reject) =>
  354. async.waterfall(
  355. [
  356. next => {
  357. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  358. },
  359. (station, next) => {
  360. if (station) {
  361. station = StationsModule.stationSchema(station);
  362. CacheModule.runJob("HSET", {
  363. table: "stations",
  364. key: station._id,
  365. value: station
  366. });
  367. next(true, station);
  368. } else next("Station not found");
  369. }
  370. ],
  371. (err, station) => {
  372. if (err && err !== true) return reject(new Error(err));
  373. return resolve(station);
  374. }
  375. )
  376. );
  377. }
  378. /**
  379. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  380. *
  381. * @param {object} payload - object that contains the payload
  382. * @param {string} payload.stationId - the id of the station to update
  383. * @returns {Promise} - returns a promise (resolve, reject)
  384. */
  385. UPDATE_STATION(payload) {
  386. return new Promise((resolve, reject) => {
  387. async.waterfall(
  388. [
  389. next => {
  390. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  391. },
  392. (station, next) => {
  393. if (!station) {
  394. CacheModule.runJob("HDEL", {
  395. table: "stations",
  396. key: payload.stationId
  397. })
  398. .then()
  399. .catch();
  400. return next("Station not found");
  401. }
  402. return CacheModule.runJob(
  403. "HSET",
  404. {
  405. table: "stations",
  406. key: payload.stationId,
  407. value: station
  408. },
  409. this
  410. )
  411. .then(station => {
  412. next(null, station);
  413. })
  414. .catch(next);
  415. }
  416. ],
  417. async (err, station) => {
  418. if (err && err !== true) {
  419. err = await UtilsModule.runJob(
  420. "GET_ERROR",
  421. {
  422. error: err
  423. },
  424. this
  425. );
  426. reject(new Error(err));
  427. } else resolve(station);
  428. }
  429. );
  430. });
  431. }
  432. /**
  433. * Fills up the official station playlist queue using the songs from the official station playlist
  434. *
  435. * @param {object} payload - object that contains the payload
  436. * @param {string} payload.stationId - the id of the station
  437. * @returns {Promise} - returns a promise (resolve, reject)
  438. */
  439. FILL_UP_OFFICIAL_STATION_PLAYLIST_QUEUE(payload) {
  440. return new Promise((resolve, reject) => {
  441. const { stationId } = payload;
  442. async.waterfall(
  443. [
  444. next => {
  445. PlaylistsModule.runJob("GET_STATION_PLAYLIST", { stationId, includeSongs: true }, this)
  446. .then(response => {
  447. next(null, response.playlist);
  448. })
  449. .catch(next);
  450. },
  451. (playlist, next) => {
  452. UtilsModule.runJob("SHUFFLE", { array: playlist.songs }, this)
  453. .then(response => {
  454. next(null, response.array);
  455. })
  456. .catch(next);
  457. },
  458. (playlistSongs, next) => {
  459. StationsModule.runJob("GET_STATION", { stationId }, this)
  460. .then(station => {
  461. next(null, playlistSongs, station);
  462. })
  463. .catch(next);
  464. },
  465. (playlistSongs, station, next) => {
  466. const songsStillNeeded = 50 - station.playlist.length;
  467. const currentSongs = station.playlist;
  468. const currentSongIds = station.playlist.map(song => song._id);
  469. const songsToAdd = [];
  470. playlistSongs
  471. .map(song => song._doc)
  472. .forEach(song => {
  473. if (
  474. songsToAdd.length < songsStillNeeded &&
  475. currentSongIds.indexOf(song._id.toString()) === -1
  476. )
  477. songsToAdd.push(song);
  478. });
  479. next(null, [...currentSongs, ...songsToAdd]);
  480. },
  481. (newPlaylist, next) => {
  482. StationsModule.stationModel.updateOne(
  483. { _id: stationId },
  484. { $set: { playlist: newPlaylist } },
  485. { runValidators: true },
  486. () => {
  487. StationsModule.runJob(
  488. "UPDATE_STATION",
  489. {
  490. stationId
  491. },
  492. this
  493. )
  494. .then(() => {
  495. next(null);
  496. })
  497. .catch(next);
  498. }
  499. );
  500. }
  501. ],
  502. err => {
  503. if (err) reject(err);
  504. else resolve();
  505. }
  506. );
  507. });
  508. }
  509. /**
  510. * Gets next official station song
  511. *
  512. * @param {object} payload - object that contains the payload
  513. * @param {string} payload.stationId - the id of the station
  514. * @returns {Promise} - returns a promise (resolve, reject)
  515. */
  516. GET_NEXT_OFFICIAL_STATION_SONG(payload) {
  517. return new Promise((resolve, reject) => {
  518. const { stationId } = payload;
  519. async.waterfall(
  520. [
  521. next => {
  522. StationsModule.runJob("GET_STATION", { stationId }, this)
  523. .then(station => {
  524. next(null, station);
  525. })
  526. .catch(next);
  527. },
  528. (station, next) => {
  529. if (station.playlist.length === 0) next("No songs available.");
  530. else {
  531. next(null, station.playlist[0]);
  532. }
  533. },
  534. (song, next) => {
  535. console.log(44444, song, song._id);
  536. SongsModule.runJob("GET_SONG", { id: song._id }, this)
  537. .then(response => {
  538. const { song } = response;
  539. if (song) {
  540. const newSong = {
  541. _id: song._id,
  542. songId: song.songId,
  543. title: song.title,
  544. artists: song.artists,
  545. duration: song.duration,
  546. thumbnail: song.thumbnail,
  547. requestedAt: song.requestedAt
  548. };
  549. next(null, newSong);
  550. } else {
  551. next(null, song);
  552. }
  553. })
  554. .catch(next);
  555. }
  556. ],
  557. (err, song) => {
  558. if (err) console.log(33333, err, payload);
  559. if (err) reject(err);
  560. else resolve({ song });
  561. }
  562. );
  563. });
  564. }
  565. /**
  566. * Removes first official playlist queue song
  567. *
  568. * @param {object} payload - object that contains the payload
  569. * @param {string} payload.stationId - the id of the station
  570. * @returns {Promise} - returns a promise (resolve, reject)
  571. */
  572. REMOVE_FIRST_OFFICIAL_PLAYLIST_QUEUE_SONG(payload) {
  573. return new Promise((resolve, reject) => {
  574. const { stationId } = payload;
  575. async.waterfall(
  576. [
  577. next => {
  578. StationsModule.stationModel.updateOne(
  579. { _id: stationId },
  580. { $pop: { playlist: -1 } },
  581. { runValidators: true },
  582. err => {
  583. if (err) next(err);
  584. else
  585. StationsModule.runJob(
  586. "UPDATE_STATION",
  587. {
  588. stationId
  589. },
  590. this
  591. )
  592. .then(() => {
  593. next(null);
  594. })
  595. .catch(next);
  596. }
  597. );
  598. }
  599. ],
  600. err => {
  601. if (err) reject(err);
  602. else resolve();
  603. }
  604. );
  605. });
  606. }
  607. /**
  608. * Skips a station
  609. *
  610. * @param {object} payload - object that contains the payload
  611. * @param {string} payload.stationId - the id of the station to skip
  612. * @returns {Promise} - returns a promise (resolve, reject)
  613. */
  614. SKIP_STATION(payload) {
  615. return new Promise((resolve, reject) => {
  616. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  617. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  618. async.waterfall(
  619. [
  620. next => {
  621. NotificationsModule.runJob("UNSCHEDULE", {
  622. name: `stations.nextSong?id=${payload.stationId}`
  623. })
  624. .then(() => {
  625. next();
  626. })
  627. .catch(next);
  628. },
  629. next => {
  630. StationsModule.runJob(
  631. "GET_STATION",
  632. {
  633. stationId: payload.stationId
  634. },
  635. this
  636. )
  637. .then(station => {
  638. next(null, station);
  639. })
  640. .catch(() => {});
  641. },
  642. // eslint-disable-next-line consistent-return
  643. (station, next) => {
  644. if (!station) return next("Station not found.");
  645. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  646. return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  647. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  648. // Community station with party mode enabled and songs in the queue
  649. if (station.paused) return next(null, null, -19, station);
  650. return StationsModule.stationModel.updateOne(
  651. { _id: payload.stationId },
  652. {
  653. $pull: {
  654. queue: {
  655. _id: station.queue[0]._id
  656. }
  657. }
  658. },
  659. err => {
  660. if (err) return next(err);
  661. return next(null, station.queue[0], -12, station);
  662. }
  663. );
  664. }
  665. if (station.type === "community" && !station.partyMode) {
  666. return DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel =>
  667. playlistModel.findOne({ _id: station.privatePlaylist }, (err, playlist) => {
  668. if (err) return next(err);
  669. if (!playlist) return next(null, null, -13, station);
  670. playlist = playlist.songs;
  671. if (playlist.length > 0) {
  672. let currentSongIndex;
  673. if (station.currentSongIndex < playlist.length - 1)
  674. currentSongIndex = station.currentSongIndex + 1;
  675. else currentSongIndex = 0;
  676. const callback = (err, song) => {
  677. if (err) return next(err);
  678. if (song) return next(null, song, currentSongIndex, station);
  679. const currentSong = {
  680. songId: playlist[currentSongIndex].songId,
  681. title: playlist[currentSongIndex].title,
  682. duration: playlist[currentSongIndex].duration,
  683. likes: -1,
  684. dislikes: -1,
  685. requestedAt: playlist[currentSongIndex].requestedAt
  686. };
  687. return next(null, currentSong, currentSongIndex, station);
  688. };
  689. if (mongoose.Types.ObjectId.isValid(playlist[currentSongIndex]._id))
  690. return SongsModule.runJob(
  691. "GET_SONG",
  692. {
  693. id: playlist[currentSongIndex]._id
  694. },
  695. this
  696. )
  697. .then(response => callback(null, response.song))
  698. .catch(callback);
  699. return SongsModule.runJob(
  700. "GET_SONG_FROM_ID",
  701. {
  702. songId: playlist[currentSongIndex].songId
  703. },
  704. this
  705. )
  706. .then(response => callback(null, response.song))
  707. .catch(callback);
  708. }
  709. return next(null, null, -14, station);
  710. })
  711. );
  712. }
  713. if (station.type === "official") {
  714. StationsModule.runJob(
  715. "REMOVE_FIRST_OFFICIAL_PLAYLIST_QUEUE_SONG",
  716. { stationId: station._id },
  717. this
  718. )
  719. .then(() => {
  720. StationsModule.runJob(
  721. "FILL_UP_OFFICIAL_STATION_PLAYLIST_QUEUE",
  722. { stationId: station._id },
  723. this
  724. )
  725. .then(() => {
  726. StationsModule.runJob(
  727. "GET_NEXT_OFFICIAL_STATION_SONG",
  728. { stationId: station._id },
  729. this
  730. )
  731. .then(response => {
  732. next(null, response.song, 0, station);
  733. })
  734. .catch(err => {
  735. if (err === "No songs available.") next(null, null, 0, station);
  736. else next(err);
  737. });
  738. })
  739. .catch(next);
  740. })
  741. .catch(next);
  742. }
  743. },
  744. (song, currentSongIndex, station, next) => {
  745. const $set = {};
  746. if (song === null) $set.currentSong = null;
  747. else if (song.likes === -1 && song.dislikes === -1) {
  748. $set.currentSong = {
  749. songId: song.songId,
  750. title: song.title,
  751. duration: song.duration,
  752. skipDuration: 0,
  753. likes: -1,
  754. dislikes: -1,
  755. requestedAt: song.requestedAt
  756. };
  757. } else {
  758. $set.currentSong = {
  759. _id: song._id,
  760. songId: song.songId,
  761. title: song.title,
  762. artists: song.artists,
  763. duration: song.duration,
  764. likes: song.likes,
  765. dislikes: song.dislikes,
  766. skipDuration: song.skipDuration,
  767. thumbnail: song.thumbnail,
  768. requestedAt: song.requestedAt
  769. };
  770. }
  771. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  772. $set.startedAt = Date.now();
  773. $set.timePaused = 0;
  774. if (station.paused) $set.pausedAt = Date.now();
  775. next(null, $set, station);
  776. },
  777. ($set, station, next) => {
  778. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, err => {
  779. if (err) return next(err);
  780. return StationsModule.runJob(
  781. "UPDATE_STATION",
  782. {
  783. stationId: station._id
  784. },
  785. this
  786. )
  787. .then(station => {
  788. if (station.type === "community" && station.partyMode === true)
  789. CacheModule.runJob("PUB", {
  790. channel: "station.queueUpdate",
  791. value: payload.stationId
  792. })
  793. .then()
  794. .catch();
  795. next(null, station);
  796. })
  797. .catch(next);
  798. });
  799. }
  800. ],
  801. async (err, station) => {
  802. if (err) {
  803. console.log(123, err);
  804. err = await UtilsModule.runJob(
  805. "GET_ERROR",
  806. {
  807. error: err
  808. },
  809. this
  810. );
  811. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  812. reject(new Error(err));
  813. } else {
  814. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  815. station.currentSong.skipVotes = 0;
  816. }
  817. // TODO Pub/Sub this
  818. IOModule.runJob("EMIT_TO_ROOM", {
  819. room: `station.${station._id}`,
  820. args: [
  821. "event:songs.next",
  822. {
  823. currentSong: station.currentSong,
  824. startedAt: station.startedAt,
  825. paused: station.paused,
  826. timePaused: 0
  827. }
  828. ]
  829. })
  830. .then()
  831. .catch();
  832. if (station.privacy === "public") {
  833. IOModule.runJob("EMIT_TO_ROOM", {
  834. room: "home",
  835. args: ["event:station.nextSong", station._id, station.currentSong]
  836. })
  837. .then()
  838. .catch();
  839. } else {
  840. const sockets = await IOModule.runJob("GET_ROOM_SOCKETS", { room: "home" }, this);
  841. Object.keys(sockets).forEach(socketKey => {
  842. const socket = sockets[socketKey];
  843. const { session } = socket;
  844. if (session.sessionId) {
  845. CacheModule.runJob(
  846. "HGET",
  847. {
  848. table: "sessions",
  849. key: session.sessionId
  850. },
  851. this
  852. // eslint-disable-next-line no-loop-func
  853. ).then(session => {
  854. if (session) {
  855. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(
  856. userModel => {
  857. userModel.findOne(
  858. {
  859. _id: session.userId
  860. },
  861. (err, user) => {
  862. if (!err && user) {
  863. if (user.role === "admin")
  864. socket.emit(
  865. "event:station.nextSong",
  866. station._id,
  867. station.currentSong
  868. );
  869. else if (
  870. station.type === "community" &&
  871. station.owner === session.userId
  872. )
  873. socket.emit(
  874. "event:station.nextSong",
  875. station._id,
  876. station.currentSong
  877. );
  878. }
  879. }
  880. );
  881. }
  882. );
  883. }
  884. });
  885. }
  886. });
  887. }
  888. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  889. IOModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  890. sockets: await IOModule.runJob(
  891. "GET_ROOM_SOCKETS",
  892. {
  893. room: `station.${station._id}`
  894. },
  895. this
  896. ),
  897. room: `song.${station.currentSong.songId}`
  898. });
  899. if (!station.paused) {
  900. NotificationsModule.runJob("SCHEDULE", {
  901. name: `stations.nextSong?id=${station._id}`,
  902. time: station.currentSong.duration * 1000,
  903. station
  904. });
  905. }
  906. } else {
  907. IOModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", {
  908. sockets: await IOModule.runJob(
  909. "GET_ROOM_SOCKETS",
  910. {
  911. room: `station.${station._id}`
  912. },
  913. this
  914. )
  915. })
  916. .then()
  917. .catch();
  918. }
  919. resolve({ station });
  920. }
  921. }
  922. );
  923. });
  924. }
  925. /**
  926. * Checks if a user can view/access a station
  927. *
  928. * @param {object} payload - object that contains the payload
  929. * @param {object} payload.station - the station object of the station in question
  930. * @param {string} payload.userId - the id of the user in question
  931. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  932. * @returns {Promise} - returns a promise (resolve, reject)
  933. */
  934. CAN_USER_VIEW_STATION(payload) {
  935. return new Promise((resolve, reject) => {
  936. async.waterfall(
  937. [
  938. next => {
  939. if (payload.station.privacy === "public") return next(true);
  940. if (payload.station.privacy === "unlisted")
  941. if (payload.hideUnlisted === true) return next();
  942. else return next(true);
  943. if (!payload.userId) return next("Not allowed");
  944. return next();
  945. },
  946. next => {
  947. DBModule.runJob(
  948. "GET_MODEL",
  949. {
  950. modelName: "user"
  951. },
  952. this
  953. ).then(userModel => {
  954. userModel.findOne({ _id: payload.userId }, next);
  955. });
  956. },
  957. (user, next) => {
  958. if (!user) return next("Not allowed");
  959. if (user.role === "admin") return next(true);
  960. if (payload.station.type === "official") return next("Not allowed");
  961. if (payload.station.owner === payload.userId) return next(true);
  962. return next("Not allowed");
  963. }
  964. ],
  965. async errOrResult => {
  966. if (errOrResult !== true && errOrResult !== "Not allowed") {
  967. errOrResult = await UtilsModule.runJob(
  968. "GET_ERROR",
  969. {
  970. error: errOrResult
  971. },
  972. this
  973. );
  974. reject(new Error(errOrResult));
  975. } else {
  976. resolve(errOrResult === true);
  977. }
  978. }
  979. );
  980. });
  981. }
  982. /**
  983. * Checks if a user has favorited a station or not
  984. *
  985. * @param {object} payload - object that contains the payload
  986. * @param {object} payload.stationId - the id of the station in question
  987. * @param {string} payload.userId - the id of the user in question
  988. * @returns {Promise} - returns a promise (resolve, reject)
  989. */
  990. HAS_USER_FAVORITED_STATION(payload) {
  991. return new Promise((resolve, reject) => {
  992. async.waterfall(
  993. [
  994. next => {
  995. DBModule.runJob(
  996. "GET_MODEL",
  997. {
  998. modelName: "user"
  999. },
  1000. this
  1001. ).then(userModel => {
  1002. userModel.findOne({ _id: payload.userId }, next);
  1003. });
  1004. },
  1005. (user, next) => {
  1006. if (!user) return next("User not found.");
  1007. if (user.favoriteStations.indexOf(payload.stationId) !== -1) return next(null, true);
  1008. return next(null, false);
  1009. }
  1010. ],
  1011. async (err, isStationFavorited) => {
  1012. if (err && err !== true) {
  1013. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1014. return reject(new Error(err));
  1015. }
  1016. return resolve(isStationFavorited);
  1017. }
  1018. );
  1019. });
  1020. }
  1021. /**
  1022. * Returns a list of sockets in a room that can and can't know about a station
  1023. *
  1024. * @param {object} payload - the payload object
  1025. * @param {object} payload.station - the station object
  1026. * @param {string} payload.room - the socket.io room to get the sockets from
  1027. * @returns {Promise} - returns a promise (resolve, reject)
  1028. */
  1029. GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION(payload) {
  1030. return new Promise((resolve, reject) => {
  1031. IOModule.runJob("GET_ROOM_SOCKETS", { room: payload.room }, this)
  1032. .then(socketsObject => {
  1033. const sockets = Object.keys(socketsObject).map(socketKey => socketsObject[socketKey]);
  1034. let socketsThatCan = [];
  1035. const socketsThatCannot = [];
  1036. if (payload.station.privacy === "public") {
  1037. socketsThatCan = sockets;
  1038. resolve({ socketsThatCan, socketsThatCannot });
  1039. } else {
  1040. async.eachLimit(
  1041. sockets,
  1042. 1,
  1043. (socket, next) => {
  1044. const { session } = socket;
  1045. async.waterfall(
  1046. [
  1047. next => {
  1048. if (!session.sessionId) next("No session id");
  1049. else next();
  1050. },
  1051. next => {
  1052. CacheModule.runJob(
  1053. "HGET",
  1054. {
  1055. table: "sessions",
  1056. key: session.sessionId
  1057. },
  1058. this
  1059. )
  1060. .then(response => {
  1061. next(null, response);
  1062. })
  1063. .catch(next);
  1064. },
  1065. (session, next) => {
  1066. if (!session) next("No session");
  1067. else {
  1068. DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
  1069. .then(userModel => {
  1070. next(null, userModel);
  1071. })
  1072. .catch(next);
  1073. }
  1074. },
  1075. (userModel, next) => {
  1076. if (!userModel) next("No user model");
  1077. else
  1078. userModel.findOne(
  1079. {
  1080. _id: session.userId
  1081. },
  1082. next
  1083. );
  1084. },
  1085. (user, next) => {
  1086. if (!user) next("No user found");
  1087. else if (user.role === "admin") {
  1088. socketsThatCan.push(socket);
  1089. next();
  1090. } else if (
  1091. payload.station.type === "community" &&
  1092. payload.station.owner === session.userId
  1093. ) {
  1094. socketsThatCan.push(socket);
  1095. next();
  1096. }
  1097. }
  1098. ],
  1099. err => {
  1100. if (err) socketsThatCannot.push(socket);
  1101. next();
  1102. }
  1103. );
  1104. },
  1105. err => {
  1106. if (err) reject(err);
  1107. else resolve({ socketsThatCan, socketsThatCannot });
  1108. }
  1109. );
  1110. }
  1111. })
  1112. .catch(reject);
  1113. });
  1114. }
  1115. /**
  1116. * Adds a playlist to be included in a station
  1117. *
  1118. * @param {object} payload - object that contains the payload
  1119. * @param {object} payload.stationId - the id of the station to include the playlist in
  1120. * @param {object} payload.playlistId - the id of the playlist to be included
  1121. * @returns {Promise} - returns a promise (resolve, reject)
  1122. */
  1123. INCLUDE_PLAYLIST(payload) {
  1124. return new Promise((resolve, reject) => {
  1125. async.waterfall(
  1126. [
  1127. next => {
  1128. if (!payload.stationId) next("Please specify a station id");
  1129. else if (!payload.playlistId) next("Please specify a playlist id");
  1130. else next();
  1131. },
  1132. next => {
  1133. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1134. .then(station => {
  1135. next(null, station);
  1136. })
  1137. .catch(next);
  1138. },
  1139. (station, next) => {
  1140. if (station.playlist2 === payload.playlistId) next("You cannot include the station playlist");
  1141. else if (station.includedPlaylists.indexOf(payload.playlistId) !== -1)
  1142. next("This playlist is already included");
  1143. else if (station.excludedPlaylists.indexOf(payload.playlistId) !== -1)
  1144. next(
  1145. "This playlist is currently excluded, please remove it from there before including it"
  1146. );
  1147. else
  1148. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId: payload.playlistId }, this)
  1149. .then(() => {
  1150. next(null);
  1151. })
  1152. .catch(next);
  1153. },
  1154. next => {
  1155. DBModule.runJob(
  1156. "GET_MODEL",
  1157. {
  1158. modelName: "station"
  1159. },
  1160. this
  1161. ).then(stationModel => {
  1162. stationModel.updateOne(
  1163. { _id: payload.stationId },
  1164. { $push: { includedPlaylists: payload.playlistId } },
  1165. next
  1166. );
  1167. });
  1168. },
  1169. (res, next) => {
  1170. StationsModule.runJob(
  1171. "UPDATE_STATION",
  1172. {
  1173. stationId: payload.stationId
  1174. },
  1175. this
  1176. )
  1177. .then(() => {
  1178. next();
  1179. })
  1180. .catch(next);
  1181. }
  1182. ],
  1183. async err => {
  1184. if (err && err !== true) {
  1185. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1186. return reject(new Error(err));
  1187. }
  1188. return resolve();
  1189. }
  1190. );
  1191. });
  1192. }
  1193. /**
  1194. * Removes a playlist that is included in a station
  1195. *
  1196. * @param {object} payload - object that contains the payload
  1197. * @param {object} payload.stationId - the id of the station
  1198. * @param {object} payload.playlistId - the id of the playlist
  1199. * @returns {Promise} - returns a promise (resolve, reject)
  1200. */
  1201. REMOVE_INCLUDED_PLAYLIST(payload) {
  1202. return new Promise((resolve, reject) => {
  1203. async.waterfall(
  1204. [
  1205. next => {
  1206. if (!payload.stationId) next("Please specify a station id");
  1207. else if (!payload.playlistId) next("Please specify a playlist id");
  1208. else next();
  1209. },
  1210. next => {
  1211. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1212. .then(station => {
  1213. next(null, station);
  1214. })
  1215. .catch(next);
  1216. },
  1217. (station, next) => {
  1218. if (station.includedPlaylists.indexOf(payload.playlistId) === -1)
  1219. next("This playlist isn't included");
  1220. else next();
  1221. },
  1222. next => {
  1223. DBModule.runJob(
  1224. "GET_MODEL",
  1225. {
  1226. modelName: "station"
  1227. },
  1228. this
  1229. ).then(stationModel => {
  1230. stationModel.updateOne(
  1231. { _id: payload.stationId },
  1232. { $pull: { includedPlaylists: payload.playlistId } },
  1233. next
  1234. );
  1235. });
  1236. },
  1237. (res, next) => {
  1238. StationsModule.runJob(
  1239. "UPDATE_STATION",
  1240. {
  1241. stationId: payload.stationId
  1242. },
  1243. this
  1244. )
  1245. .then(() => {
  1246. next();
  1247. })
  1248. .catch(next);
  1249. }
  1250. ],
  1251. async err => {
  1252. if (err && err !== true) {
  1253. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1254. return reject(new Error(err));
  1255. }
  1256. return resolve();
  1257. }
  1258. );
  1259. });
  1260. }
  1261. /**
  1262. * Adds a playlist to be excluded in a station
  1263. *
  1264. * @param {object} payload - object that contains the payload
  1265. * @param {object} payload.stationId - the id of the station
  1266. * @param {object} payload.playlistId - the id of the playlist
  1267. * @returns {Promise} - returns a promise (resolve, reject)
  1268. */
  1269. EXCLUDE_PLAYLIST(payload) {
  1270. return new Promise((resolve, reject) => {
  1271. async.waterfall(
  1272. [
  1273. next => {
  1274. if (!payload.stationId) next("Please specify a station id");
  1275. else if (!payload.playlistId) next("Please specify a playlist id");
  1276. else next();
  1277. },
  1278. next => {
  1279. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1280. .then(station => {
  1281. next(null, station);
  1282. })
  1283. .catch(next);
  1284. },
  1285. (station, next) => {
  1286. if (station.playlist2 === payload.playlistId) next("You cannot exclude the station playlist");
  1287. else if (station.excludedPlaylists.indexOf(payload.playlistId) !== -1)
  1288. next("This playlist is already excluded");
  1289. else if (station.includedPlaylists.indexOf(payload.playlistId) !== -1)
  1290. next(
  1291. "This playlist is currently included, please remove it from there before excluding it"
  1292. );
  1293. else
  1294. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId: payload.playlistId }, this)
  1295. .then(() => {
  1296. next(null);
  1297. })
  1298. .catch(next);
  1299. },
  1300. next => {
  1301. DBModule.runJob(
  1302. "GET_MODEL",
  1303. {
  1304. modelName: "station"
  1305. },
  1306. this
  1307. ).then(stationModel => {
  1308. stationModel.updateOne(
  1309. { _id: payload.stationId },
  1310. { $push: { excludedPlaylists: payload.playlistId } },
  1311. next
  1312. );
  1313. });
  1314. },
  1315. (res, next) => {
  1316. StationsModule.runJob(
  1317. "UPDATE_STATION",
  1318. {
  1319. stationId: payload.stationId
  1320. },
  1321. this
  1322. )
  1323. .then(() => {
  1324. next();
  1325. })
  1326. .catch(next);
  1327. }
  1328. ],
  1329. async err => {
  1330. if (err && err !== true) {
  1331. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1332. return reject(new Error(err));
  1333. }
  1334. return resolve();
  1335. }
  1336. );
  1337. });
  1338. }
  1339. /**
  1340. * Removes a playlist that is excluded in a station
  1341. *
  1342. * @param {object} payload - object that contains the payload
  1343. * @param {object} payload.stationId - the id of the station
  1344. * @param {object} payload.playlistId - the id of the playlist
  1345. * @returns {Promise} - returns a promise (resolve, reject)
  1346. */
  1347. REMOVE_EXCLUDED_PLAYLIST(payload) {
  1348. return new Promise((resolve, reject) => {
  1349. console.log(112, payload);
  1350. async.waterfall(
  1351. [
  1352. next => {
  1353. if (!payload.stationId) next("Please specify a station id");
  1354. else if (!payload.playlistId) next("Please specify a playlist id");
  1355. else next();
  1356. },
  1357. next => {
  1358. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1359. .then(station => {
  1360. next(null, station);
  1361. })
  1362. .catch(next);
  1363. },
  1364. (station, next) => {
  1365. if (station.excludedPlaylists.indexOf(payload.playlistId) === -1)
  1366. next("This playlist isn't excluded");
  1367. else next();
  1368. },
  1369. next => {
  1370. DBModule.runJob(
  1371. "GET_MODEL",
  1372. {
  1373. modelName: "station"
  1374. },
  1375. this
  1376. ).then(stationModel => {
  1377. stationModel.updateOne(
  1378. { _id: payload.stationId },
  1379. { $pull: { excludedPlaylists: payload.playlistId } },
  1380. next
  1381. );
  1382. });
  1383. },
  1384. (res, next) => {
  1385. StationsModule.runJob(
  1386. "UPDATE_STATION",
  1387. {
  1388. stationId: payload.stationId
  1389. },
  1390. this
  1391. )
  1392. .then(() => {
  1393. next();
  1394. })
  1395. .catch(next);
  1396. }
  1397. ],
  1398. async err => {
  1399. if (err && err !== true) {
  1400. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1401. return reject(new Error(err));
  1402. }
  1403. return resolve();
  1404. }
  1405. );
  1406. });
  1407. }
  1408. /**
  1409. * Gets stations that include or exclude a specific playlist
  1410. *
  1411. * @param {object} payload - object that contains the payload
  1412. * @param {string} payload.playlistId - the playlist id
  1413. * @returns {Promise} - returns promise (reject, resolve)
  1414. */
  1415. GET_STATIONS_THAT_INCLUDE_OR_EXCLUDE_PLAYLIST(payload) {
  1416. return new Promise((resolve, reject) => {
  1417. DBModule.runJob(
  1418. "GET_MODEL",
  1419. {
  1420. modelName: "station"
  1421. },
  1422. this
  1423. ).then(stationModel => {
  1424. stationModel.find(
  1425. { $or: [{ includedPlaylists: payload.playlistId }, { excludedPlaylists: payload.playlistId }] },
  1426. (err, stations) => {
  1427. if (err) reject(err);
  1428. else resolve({ stationIds: stations.map(station => station._id) });
  1429. }
  1430. );
  1431. });
  1432. });
  1433. }
  1434. }
  1435. export default new _StationsModule();