stations.js 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let WSModule;
  8. let SongsModule;
  9. let PlaylistsModule;
  10. let NotificationsModule;
  11. class _StationsModule extends CoreClass {
  12. // eslint-disable-next-line require-jsdoc
  13. constructor() {
  14. super("stations");
  15. StationsModule = this;
  16. }
  17. /**
  18. * Initialises the stations module
  19. *
  20. * @returns {Promise} - returns promise (reject, resolve)
  21. */
  22. async initialize() {
  23. CacheModule = this.moduleManager.modules.cache;
  24. DBModule = this.moduleManager.modules.db;
  25. UtilsModule = this.moduleManager.modules.utils;
  26. WSModule = this.moduleManager.modules.ws;
  27. SongsModule = this.moduleManager.modules.songs;
  28. PlaylistsModule = this.moduleManager.modules.playlists;
  29. NotificationsModule = this.moduleManager.modules.notifications;
  30. this.userList = {};
  31. this.usersPerStation = {};
  32. this.usersPerStationCount = {};
  33. // TEMP
  34. CacheModule.runJob("SUB", {
  35. channel: "station.pause",
  36. cb: async stationId => {
  37. NotificationsModule.runJob("REMOVE", {
  38. subscription: `stations.nextSong?id=${stationId}`
  39. }).then();
  40. }
  41. });
  42. CacheModule.runJob("SUB", {
  43. channel: "station.resume",
  44. cb: async stationId => {
  45. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  46. }
  47. });
  48. CacheModule.runJob("SUB", {
  49. channel: "station.queueUpdate",
  50. cb: async stationId => {
  51. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  52. if (!station.currentSong && station.queue.length > 0) {
  53. StationsModule.runJob("INITIALIZE_STATION", {
  54. stationId
  55. }).then();
  56. }
  57. });
  58. }
  59. });
  60. CacheModule.runJob("SUB", {
  61. channel: "station.newOfficialPlaylist",
  62. cb: async stationId => {
  63. CacheModule.runJob("HGET", {
  64. table: "officialPlaylists",
  65. key: stationId
  66. }).then(playlistObj => {
  67. if (playlistObj) {
  68. WSModule.runJob("EMIT_TO_ROOM", {
  69. room: `station.${stationId}`,
  70. args: ["event:newOfficialPlaylist", { data: { playlist: playlistObj.songs } }]
  71. });
  72. }
  73. });
  74. }
  75. });
  76. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  77. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  78. return new Promise((resolve, reject) =>
  79. async.waterfall(
  80. [
  81. next => {
  82. this.setStage(2);
  83. CacheModule.runJob("HGETALL", { table: "stations" })
  84. .then(stations => {
  85. next(null, stations);
  86. })
  87. .catch(next);
  88. },
  89. (stations, next) => {
  90. this.setStage(3);
  91. if (!stations) return next();
  92. const stationIds = Object.keys(stations);
  93. return async.each(
  94. stationIds,
  95. (stationId, next) => {
  96. stationModel.findOne({ _id: stationId }, (err, station) => {
  97. if (err) next(err);
  98. else if (!station) {
  99. CacheModule.runJob("HDEL", {
  100. table: "stations",
  101. key: stationId
  102. })
  103. .then(() => {
  104. next();
  105. })
  106. .catch(next);
  107. } else next();
  108. });
  109. },
  110. next
  111. );
  112. },
  113. next => {
  114. this.setStage(4);
  115. stationModel.find({}, next);
  116. },
  117. (stations, next) => {
  118. this.setStage(5);
  119. async.each(
  120. stations,
  121. (station, next2) => {
  122. async.waterfall(
  123. [
  124. next => {
  125. CacheModule.runJob("HSET", {
  126. table: "stations",
  127. key: station._id,
  128. value: stationSchema(station)
  129. })
  130. .then(station => next(null, station))
  131. .catch(next);
  132. },
  133. (station, next) => {
  134. StationsModule.runJob(
  135. "INITIALIZE_STATION",
  136. {
  137. stationId: station._id
  138. },
  139. null,
  140. -1
  141. )
  142. .then(() => {
  143. next();
  144. })
  145. .catch(next);
  146. }
  147. ],
  148. err => {
  149. next2(err);
  150. }
  151. );
  152. },
  153. next
  154. );
  155. }
  156. ],
  157. async err => {
  158. if (err) {
  159. err = await UtilsModule.runJob("GET_ERROR", {
  160. error: err
  161. });
  162. reject(new Error(err));
  163. } else {
  164. resolve();
  165. }
  166. }
  167. )
  168. );
  169. }
  170. /**
  171. * Initialises a station
  172. *
  173. * @param {object} payload - object that contains the payload
  174. * @param {string} payload.stationId - id of the station to initialise
  175. * @returns {Promise} - returns a promise (resolve, reject)
  176. */
  177. INITIALIZE_STATION(payload) {
  178. return new Promise((resolve, reject) => {
  179. async.waterfall(
  180. [
  181. next => {
  182. StationsModule.runJob(
  183. "GET_STATION",
  184. {
  185. stationId: payload.stationId
  186. },
  187. this
  188. )
  189. .then(station => {
  190. next(null, station);
  191. })
  192. .catch(next);
  193. },
  194. (station, next) => {
  195. if (!station) return next("Station not found.");
  196. return NotificationsModule.runJob(
  197. "UNSCHEDULE",
  198. {
  199. name: `stations.nextSong?id=${station._id}`
  200. },
  201. this
  202. )
  203. .then()
  204. .catch()
  205. .finally(() => {
  206. NotificationsModule.runJob("SUBSCRIBE", {
  207. name: `stations.nextSong?id=${station._id}`,
  208. cb: () =>
  209. StationsModule.runJob("SKIP_STATION", {
  210. stationId: station._id,
  211. natural: true
  212. }),
  213. unique: true,
  214. station
  215. })
  216. .then()
  217. .catch();
  218. if (station.paused) return next(true, station);
  219. return next(null, station);
  220. });
  221. },
  222. (station, next) => {
  223. if (!station.currentSong) {
  224. return StationsModule.runJob(
  225. "SKIP_STATION",
  226. {
  227. stationId: station._id,
  228. natural: false
  229. },
  230. this
  231. )
  232. .then(station => {
  233. next(true, station);
  234. })
  235. .catch(next)
  236. .finally(() => {});
  237. }
  238. let timeLeft =
  239. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  240. if (Number.isNaN(timeLeft)) timeLeft = -1;
  241. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  242. return StationsModule.runJob(
  243. "SKIP_STATION",
  244. {
  245. stationId: station._id,
  246. natural: false
  247. },
  248. this
  249. )
  250. .then(station => {
  251. next(null, station);
  252. })
  253. .catch(next);
  254. }
  255. // name, time, cb, station
  256. NotificationsModule.runJob("SCHEDULE", {
  257. name: `stations.nextSong?id=${station._id}`,
  258. time: timeLeft,
  259. station
  260. });
  261. return next(null, station);
  262. }
  263. ],
  264. async (err, station) => {
  265. if (err && err !== true) {
  266. err = await UtilsModule.runJob(
  267. "GET_ERROR",
  268. {
  269. error: err
  270. },
  271. this
  272. );
  273. reject(new Error(err));
  274. } else resolve(station);
  275. }
  276. );
  277. });
  278. }
  279. /**
  280. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  281. *
  282. * @param {object} payload - object that contains the payload
  283. * @param {string} payload.stationId - id of the station
  284. * @returns {Promise} - returns a promise (resolve, reject)
  285. */
  286. GET_STATION(payload) {
  287. return new Promise((resolve, reject) => {
  288. async.waterfall(
  289. [
  290. next => {
  291. CacheModule.runJob("HGET", { table: "stations", key: payload.stationId }, this)
  292. .then(station => next(null, station))
  293. .catch(next);
  294. },
  295. (station, next) => {
  296. if (station) return next(true, station);
  297. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  298. },
  299. (station, next) => {
  300. if (station) {
  301. station = StationsModule.stationSchema(station);
  302. CacheModule.runJob("HSET", {
  303. table: "stations",
  304. key: payload.stationId,
  305. value: station
  306. })
  307. .then()
  308. .catch();
  309. next(true, station);
  310. } else next("Station not found");
  311. }
  312. ],
  313. async (err, station) => {
  314. if (err && err !== true) {
  315. err = await UtilsModule.runJob(
  316. "GET_ERROR",
  317. {
  318. error: err
  319. },
  320. this
  321. );
  322. reject(new Error(err));
  323. } else resolve(station);
  324. }
  325. );
  326. });
  327. }
  328. /**
  329. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  330. *
  331. * @param {object} payload - object that contains the payload
  332. * @param {string} payload.stationName - the unique name of the station
  333. * @returns {Promise} - returns a promise (resolve, reject)
  334. */
  335. async GET_STATION_BY_NAME(payload) {
  336. return new Promise((resolve, reject) =>
  337. async.waterfall(
  338. [
  339. next => {
  340. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  341. },
  342. (station, next) => {
  343. if (station) {
  344. station = StationsModule.stationSchema(station);
  345. CacheModule.runJob("HSET", {
  346. table: "stations",
  347. key: station._id,
  348. value: station
  349. });
  350. next(true, station);
  351. } else next("Station not found");
  352. }
  353. ],
  354. (err, station) => {
  355. if (err && err !== true) return reject(new Error(err));
  356. return resolve(station);
  357. }
  358. )
  359. );
  360. }
  361. /**
  362. * Gets stations data
  363. *
  364. * @param {object} payload - object containing the payload
  365. * @param {string} payload.page - the page
  366. * @param {string} payload.pageSize - the page size
  367. * @param {string} payload.properties - the properties to return for each station
  368. * @param {string} payload.sort - the sort object
  369. * @param {string} payload.queries - the queries array
  370. * @param {string} payload.operator - the operator for queries
  371. * @returns {Promise} - returns a promise (resolve, reject)
  372. */
  373. GET_DATA(payload) {
  374. return new Promise((resolve, reject) => {
  375. const { page, pageSize, properties, sort, queries, operator } = payload;
  376. console.log("GET_DATA", payload);
  377. const newQueries = queries.map(query => {
  378. const { data, filter, filterType } = query;
  379. const newQuery = {};
  380. if (filterType === "regex") {
  381. newQuery[filter.property] = new RegExp(`${data.slice(1, data.length - 1)}`, "i");
  382. } else if (filterType === "contains") {
  383. newQuery[filter.property] = new RegExp(`${data.replaceAll(/[.*+?^${}()|[\]\\]/g, "\\$&")}`, "i");
  384. } else if (filterType === "exact") {
  385. newQuery[filter.property] = data.toString();
  386. }
  387. return newQuery;
  388. });
  389. const queryObject = {};
  390. if (newQueries.length > 0) {
  391. if (operator === "and") queryObject.$and = newQueries;
  392. else if (operator === "or") queryObject.$or = newQueries;
  393. else if (operator === "nor") queryObject.$nor = newQueries;
  394. }
  395. async.waterfall(
  396. [
  397. next => {
  398. StationsModule.stationModel.find(queryObject).count((err, count) => {
  399. next(err, count);
  400. });
  401. },
  402. (count, next) => {
  403. StationsModule.stationModel
  404. .find(queryObject)
  405. .sort(sort)
  406. .skip(pageSize * (page - 1))
  407. .limit(pageSize)
  408. .select(properties.join(" "))
  409. .exec((err, stations) => {
  410. next(err, count, stations);
  411. });
  412. }
  413. ],
  414. (err, count, stations) => {
  415. if (err && err !== true) return reject(new Error(err));
  416. return resolve({ data: stations, count });
  417. }
  418. );
  419. });
  420. }
  421. /**
  422. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  423. *
  424. * @param {object} payload - object that contains the payload
  425. * @param {string} payload.stationId - the id of the station to update
  426. * @returns {Promise} - returns a promise (resolve, reject)
  427. */
  428. UPDATE_STATION(payload) {
  429. return new Promise((resolve, reject) => {
  430. async.waterfall(
  431. [
  432. next => {
  433. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  434. },
  435. (station, next) => {
  436. if (!station) {
  437. CacheModule.runJob("HDEL", {
  438. table: "stations",
  439. key: payload.stationId
  440. })
  441. .then()
  442. .catch();
  443. return next("Station not found");
  444. }
  445. return CacheModule.runJob(
  446. "HSET",
  447. {
  448. table: "stations",
  449. key: payload.stationId,
  450. value: station
  451. },
  452. this
  453. )
  454. .then(station => {
  455. next(null, station);
  456. })
  457. .catch(next);
  458. }
  459. ],
  460. async (err, station) => {
  461. if (err && err !== true) {
  462. err = await UtilsModule.runJob(
  463. "GET_ERROR",
  464. {
  465. error: err
  466. },
  467. this
  468. );
  469. reject(new Error(err));
  470. } else resolve(station);
  471. }
  472. );
  473. });
  474. }
  475. /**
  476. * Fills up the official station playlist queue using the songs from the official station playlist
  477. *
  478. * @param {object} payload - object that contains the payload
  479. * @param {string} payload.stationId - the id of the station
  480. * @param {string} payload.ignoreExistingQueue - ignore the existing queue songs, replacing the old queue with a completely fresh one
  481. * @returns {Promise} - returns a promise (resolve, reject)
  482. */
  483. FILL_UP_STATION_QUEUE_FROM_STATION_PLAYLIST(payload) {
  484. return new Promise((resolve, reject) => {
  485. const { stationId, ignoreExistingQueue } = payload;
  486. async.waterfall(
  487. [
  488. next => {
  489. PlaylistsModule.runJob("GET_STATION_PLAYLIST", { stationId, includeSongs: true }, this)
  490. .then(response => {
  491. next(null, response.playlist);
  492. })
  493. .catch(next);
  494. },
  495. (playlist, next) => {
  496. StationsModule.runJob("GET_STATION", { stationId }, this)
  497. .then(station => {
  498. if (ignoreExistingQueue) station.queue = [];
  499. next(null, playlist, station);
  500. })
  501. .catch(next);
  502. },
  503. (playlist, station, next) => {
  504. if (station.playMode === "random") {
  505. UtilsModule.runJob("SHUFFLE", { array: playlist.songs }, this)
  506. .then(response => {
  507. next(null, response.array, station);
  508. })
  509. .catch(next);
  510. } else next(null, playlist.songs, station);
  511. },
  512. (_playlistSongs, station, next) => {
  513. let playlistSongs = JSON.parse(JSON.stringify(_playlistSongs));
  514. if (station.playMode === "sequential") {
  515. if (station.currentSongIndex <= playlistSongs.length) {
  516. const songsToAddToEnd = playlistSongs.splice(0, station.currentSongIndex);
  517. playlistSongs = [...playlistSongs, ...songsToAddToEnd];
  518. }
  519. }
  520. const songsStillNeeded = 50 - station.queue.length;
  521. const currentSongs = station.queue;
  522. const currentYoutubeIds = station.queue.map(song => song.youtubeId);
  523. const songsToAdd = [];
  524. let lastSongAdded = null;
  525. playlistSongs.every(song => {
  526. if (
  527. songsToAdd.length < songsStillNeeded &&
  528. currentYoutubeIds.indexOf(song.youtubeId) === -1
  529. ) {
  530. lastSongAdded = song;
  531. songsToAdd.push(song);
  532. return true;
  533. }
  534. if (songsToAdd.length >= songsStillNeeded) return false;
  535. return true;
  536. });
  537. let { currentSongIndex } = station;
  538. if (station.playMode === "sequential" && lastSongAdded) {
  539. const indexOfLastSong = _playlistSongs
  540. .map(song => song.youtubeId)
  541. .indexOf(lastSongAdded.youtubeId);
  542. if (indexOfLastSong !== -1) currentSongIndex = indexOfLastSong;
  543. }
  544. next(null, currentSongs, songsToAdd, currentSongIndex);
  545. },
  546. (currentSongs, songsToAdd, currentSongIndex, next) => {
  547. SongsModule.runJob("GET_SONGS", {
  548. songIds: songsToAdd.map(song => song._id),
  549. properties: [
  550. "youtubeId",
  551. "title",
  552. "duration",
  553. "skipDuration",
  554. "artists",
  555. "thumbnail",
  556. "status"
  557. ]
  558. })
  559. .then(response => {
  560. const newSongsToAdd = songsToAdd.map(song =>
  561. response.songs.find(newSong => newSong._id.toString() === song._id.toString())
  562. );
  563. next(null, currentSongs, newSongsToAdd, currentSongIndex);
  564. })
  565. .catch(err => next(err));
  566. },
  567. (currentSongs, songsToAdd, currentSongIndex, next) => {
  568. const newPlaylist = [...currentSongs, ...songsToAdd].map(song => {
  569. if (!song._id) song._id = null;
  570. return song;
  571. });
  572. next(null, newPlaylist, currentSongIndex);
  573. },
  574. (newPlaylist, currentSongIndex, next) => {
  575. StationsModule.stationModel.updateOne(
  576. { _id: stationId },
  577. { $set: { queue: newPlaylist, currentSongIndex } },
  578. { runValidators: true },
  579. err => {
  580. if (err) next(err);
  581. else
  582. StationsModule.runJob(
  583. "UPDATE_STATION",
  584. {
  585. stationId
  586. },
  587. this
  588. )
  589. .then(() => {
  590. next(null);
  591. })
  592. .catch(next);
  593. }
  594. );
  595. }
  596. ],
  597. err => {
  598. if (err) reject(err);
  599. else resolve();
  600. }
  601. );
  602. });
  603. }
  604. /**
  605. * Gets next station song
  606. *
  607. * @param {object} payload - object that contains the payload
  608. * @param {string} payload.stationId - the id of the station
  609. * @returns {Promise} - returns a promise (resolve, reject)
  610. */
  611. GET_NEXT_STATION_SONG(payload) {
  612. return new Promise((resolve, reject) => {
  613. const { stationId } = payload;
  614. async.waterfall(
  615. [
  616. next => {
  617. StationsModule.runJob("GET_STATION", { stationId }, this)
  618. .then(station => {
  619. next(null, station);
  620. })
  621. .catch(next);
  622. },
  623. (station, next) => {
  624. if (station.queue.length === 0) next("No songs available.");
  625. else {
  626. next(null, station.queue[0]);
  627. }
  628. },
  629. (queueSong, next) => {
  630. if (!queueSong._id) next(null, queueSong);
  631. else
  632. SongsModule.runJob("GET_SONG", { songId: queueSong._id }, this)
  633. .then(response => {
  634. const { song } = response;
  635. if (song) {
  636. const newSong = {
  637. _id: song._id,
  638. youtubeId: song.youtubeId,
  639. title: song.title,
  640. artists: song.artists,
  641. duration: song.duration,
  642. skipDuration: song.skipDuration,
  643. thumbnail: song.thumbnail,
  644. requestedAt: queueSong.requestedAt,
  645. requestedBy: queueSong.requestedBy,
  646. likes: song.likes,
  647. dislikes: song.dislikes,
  648. status: song.status
  649. };
  650. return next(null, newSong);
  651. }
  652. return next(null, song);
  653. })
  654. .catch(err => {
  655. next(err);
  656. });
  657. }
  658. ],
  659. (err, song) => {
  660. if (err) console.log(33333, err, payload);
  661. if (err) reject(err);
  662. else resolve({ song });
  663. }
  664. );
  665. });
  666. }
  667. /**
  668. * Removes first station queue song
  669. *
  670. * @param {object} payload - object that contains the payload
  671. * @param {string} payload.stationId - the id of the station
  672. * @returns {Promise} - returns a promise (resolve, reject)
  673. */
  674. REMOVE_FIRST_QUEUE_SONG(payload) {
  675. return new Promise((resolve, reject) => {
  676. const { stationId } = payload;
  677. async.waterfall(
  678. [
  679. next => {
  680. StationsModule.stationModel.updateOne(
  681. { _id: stationId },
  682. { $pop: { queue: -1 } },
  683. { runValidators: true },
  684. err => {
  685. if (err) next(err);
  686. else
  687. StationsModule.runJob(
  688. "UPDATE_STATION",
  689. {
  690. stationId
  691. },
  692. this
  693. )
  694. .then(() => {
  695. next(null);
  696. })
  697. .catch(next);
  698. }
  699. );
  700. }
  701. ],
  702. err => {
  703. if (err) reject(err);
  704. else resolve();
  705. }
  706. );
  707. });
  708. }
  709. /**
  710. * Skips a station
  711. *
  712. * @param {object} payload - object that contains the payload
  713. * @param {string} payload.stationId - the id of the station to skip
  714. * @param {string} payload.natural - whether to skip naturally or forcefully
  715. * @returns {Promise} - returns a promise (resolve, reject)
  716. */
  717. SKIP_STATION(payload) {
  718. return new Promise((resolve, reject) => {
  719. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  720. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  721. async.waterfall(
  722. [
  723. // Clears up any existing timers that would skip the station if the song ends
  724. next => {
  725. NotificationsModule.runJob("UNSCHEDULE", {
  726. name: `stations.nextSong?id=${payload.stationId}`
  727. })
  728. .then(() => {
  729. next();
  730. })
  731. .catch(next);
  732. },
  733. // Gets the station object
  734. next => {
  735. StationsModule.runJob(
  736. "GET_STATION",
  737. {
  738. stationId: payload.stationId
  739. },
  740. this
  741. )
  742. .then(station => next(null, station))
  743. .catch(next);
  744. },
  745. // eslint-disable-next-line consistent-return
  746. (station, next) => {
  747. if (!station) return next("Station not found.");
  748. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  749. return next(null, null, station); // Community station with party mode enabled and no songs in the queue
  750. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  751. // Community station with party mode enabled and songs in the queue
  752. if (station.paused) return next(null, null, station);
  753. StationsModule.runJob("GET_NEXT_STATION_SONG", { stationId: station._id }, this)
  754. .then(response => {
  755. StationsModule.runJob(
  756. "REMOVE_FIRST_QUEUE_SONG",
  757. { stationId: station._id },
  758. this
  759. ).then(() => {
  760. next(null, response.song, station);
  761. });
  762. })
  763. .catch(err => {
  764. if (err === "No songs available.") next(null, null, station);
  765. else next(err);
  766. });
  767. }
  768. if (station.type === "community" && !station.partyMode) {
  769. StationsModule.runJob(
  770. "FILL_UP_STATION_QUEUE_FROM_STATION_PLAYLIST",
  771. { stationId: station._id },
  772. this
  773. )
  774. .then(() => {
  775. StationsModule.runJob("GET_NEXT_STATION_SONG", { stationId: station._id }, this)
  776. .then(response => {
  777. StationsModule.runJob(
  778. "REMOVE_FIRST_QUEUE_SONG",
  779. { stationId: station._id },
  780. this
  781. ).then(() => {
  782. next(null, response.song, station);
  783. });
  784. })
  785. .catch(err => {
  786. if (err === "No songs available.") next(null, null, station);
  787. else next(err);
  788. });
  789. })
  790. .catch(next);
  791. }
  792. if (station.type === "official") {
  793. StationsModule.runJob(
  794. "FILL_UP_STATION_QUEUE_FROM_STATION_PLAYLIST",
  795. { stationId: station._id },
  796. this
  797. )
  798. .then(() => {
  799. StationsModule.runJob("GET_NEXT_STATION_SONG", { stationId: station._id }, this)
  800. .then(response => {
  801. StationsModule.runJob(
  802. "REMOVE_FIRST_QUEUE_SONG",
  803. { stationId: station._id },
  804. this
  805. )
  806. .then(() => {
  807. next(null, response.song, station);
  808. })
  809. .catch(next);
  810. })
  811. .catch(err => {
  812. if (err === "No songs available.") next(null, null, station);
  813. else next(err);
  814. });
  815. })
  816. .catch(next);
  817. }
  818. },
  819. (song, station, next) => {
  820. const $set = {};
  821. if (song === null) $set.currentSong = null;
  822. else {
  823. $set.currentSong = {
  824. _id: song._id,
  825. youtubeId: song.youtubeId,
  826. title: song.title,
  827. artists: song.artists,
  828. duration: song.duration,
  829. skipDuration: song.skipDuration,
  830. thumbnail: song.thumbnail,
  831. requestedAt: song.requestedAt,
  832. requestedBy: song.requestedBy,
  833. status: song.status
  834. };
  835. }
  836. $set.startedAt = Date.now();
  837. $set.timePaused = 0;
  838. if (station.paused) $set.pausedAt = Date.now();
  839. next(null, $set, song, station);
  840. },
  841. ($set, song, station, next) => {
  842. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, err => {
  843. if (err) return next(err);
  844. return StationsModule.runJob("UPDATE_STATION", { stationId: station._id }, this)
  845. .then(station => {
  846. CacheModule.runJob("PUB", {
  847. channel: "station.queueUpdate",
  848. value: payload.stationId
  849. })
  850. .then()
  851. .catch();
  852. next(null, station, song);
  853. })
  854. .catch(next);
  855. });
  856. },
  857. (station, song, next) => {
  858. if (station.currentSong !== null && station.currentSong.youtubeId !== undefined) {
  859. station.currentSong.likes = song.likes;
  860. station.currentSong.dislikes = song.dislikes;
  861. station.currentSong.skipVotes = 0;
  862. }
  863. next(null, station);
  864. }
  865. ],
  866. async (err, station) => {
  867. if (err) {
  868. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  869. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  870. return reject(new Error(err));
  871. }
  872. // TODO Pub/Sub this
  873. const { currentSong } = station;
  874. WSModule.runJob("EMIT_TO_ROOM", {
  875. room: `station.${station._id}`,
  876. args: [
  877. "event:station.nextSong",
  878. {
  879. data: {
  880. currentSong,
  881. startedAt: station.startedAt,
  882. paused: station.paused,
  883. timePaused: 0,
  884. natural: payload.natural
  885. }
  886. }
  887. ]
  888. });
  889. WSModule.runJob("EMIT_TO_ROOM", {
  890. room: `manage-station.${station._id}`,
  891. args: ["event:station.nextSong", { data: { stationId: station._id, currentSong } }]
  892. });
  893. if (station.privacy === "public")
  894. WSModule.runJob("EMIT_TO_ROOM", {
  895. room: "home",
  896. args: ["event:station.nextSong", { data: { stationId: station._id, currentSong } }]
  897. });
  898. else {
  899. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: "home" }, this);
  900. sockets.forEach(async socketId => {
  901. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId });
  902. if (!socket) return;
  903. const { session } = socket;
  904. if (session.sessionId) {
  905. CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }).then(
  906. session => {
  907. if (session) {
  908. DBModule.runJob("GET_MODEL", { modelName: "user" }).then(userModel => {
  909. userModel.findOne({ _id: session.userId }, (err, user) => {
  910. if (!err && user) {
  911. if (user.role === "admin")
  912. socket.dispatch("event:station.nextSong", {
  913. data: {
  914. stationId: station._id,
  915. currentSong
  916. }
  917. });
  918. else if (
  919. station.type === "community" &&
  920. station.owner === session.userId
  921. )
  922. socket.dispatch("event:station.nextSong", {
  923. data: {
  924. stationId: station._id,
  925. currentSong
  926. }
  927. });
  928. }
  929. });
  930. });
  931. }
  932. }
  933. );
  934. }
  935. });
  936. }
  937. WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: `station.${station._id}` }).then(sockets => {
  938. if (station.currentSong !== null && station.currentSong.youtubeId !== undefined) {
  939. WSModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  940. sockets,
  941. room: `song.${station.currentSong.youtubeId}`
  942. });
  943. if (!station.paused) {
  944. NotificationsModule.runJob("SCHEDULE", {
  945. name: `stations.nextSong?id=${station._id}`,
  946. time: station.currentSong.duration * 1000,
  947. station
  948. });
  949. }
  950. } else WSModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", { sockets });
  951. });
  952. return resolve({ station });
  953. }
  954. );
  955. });
  956. }
  957. /**
  958. * Checks if a user can view/access a station
  959. *
  960. * @param {object} payload - object that contains the payload
  961. * @param {object} payload.station - the station object of the station in question
  962. * @param {string} payload.userId - the id of the user in question
  963. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  964. * @returns {Promise} - returns a promise (resolve, reject)
  965. */
  966. CAN_USER_VIEW_STATION(payload) {
  967. return new Promise((resolve, reject) => {
  968. async.waterfall(
  969. [
  970. next => {
  971. if (payload.station.privacy === "public") return next(true);
  972. if (payload.station.privacy === "unlisted")
  973. if (payload.hideUnlisted === true) return next();
  974. else return next(true);
  975. if (!payload.userId) return next("Not allowed");
  976. return next();
  977. },
  978. next => {
  979. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(userModel => {
  980. userModel.findOne({ _id: payload.userId }, next);
  981. });
  982. },
  983. (user, next) => {
  984. if (!user) return next("Not allowed");
  985. if (user.role === "admin") return next(true);
  986. if (payload.station.type === "official") return next("Not allowed");
  987. if (payload.station.owner === payload.userId) return next(true);
  988. return next("Not allowed");
  989. }
  990. ],
  991. async errOrResult => {
  992. if (errOrResult !== true && errOrResult !== "Not allowed") {
  993. errOrResult = await UtilsModule.runJob(
  994. "GET_ERROR",
  995. {
  996. error: errOrResult
  997. },
  998. this
  999. );
  1000. reject(new Error(errOrResult));
  1001. } else {
  1002. resolve(errOrResult === true);
  1003. }
  1004. }
  1005. );
  1006. });
  1007. }
  1008. /**
  1009. * Checks if a user has favorited a station or not
  1010. *
  1011. * @param {object} payload - object that contains the payload
  1012. * @param {object} payload.stationId - the id of the station in question
  1013. * @param {string} payload.userId - the id of the user in question
  1014. * @returns {Promise} - returns a promise (resolve, reject)
  1015. */
  1016. HAS_USER_FAVORITED_STATION(payload) {
  1017. return new Promise((resolve, reject) => {
  1018. async.waterfall(
  1019. [
  1020. next => {
  1021. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(userModel => {
  1022. userModel.findOne({ _id: payload.userId }, next);
  1023. });
  1024. },
  1025. (user, next) => {
  1026. if (!user) return next("User not found.");
  1027. if (user.favoriteStations.indexOf(payload.stationId) !== -1) return next(null, true);
  1028. return next(null, false);
  1029. }
  1030. ],
  1031. async (err, isStationFavorited) => {
  1032. if (err && err !== true) {
  1033. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1034. return reject(new Error(err));
  1035. }
  1036. return resolve(isStationFavorited);
  1037. }
  1038. );
  1039. });
  1040. }
  1041. /**
  1042. * Returns a list of sockets in a room that can and can't know about a station
  1043. *
  1044. * @param {object} payload - the payload object
  1045. * @param {object} payload.station - the station object
  1046. * @param {string} payload.room - the websockets room to get the sockets from
  1047. * @returns {Promise} - returns a promise (resolve, reject)
  1048. */
  1049. GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION(payload) {
  1050. return new Promise((resolve, reject) => {
  1051. WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: payload.room }, this)
  1052. .then(socketIds => {
  1053. const sockets = [];
  1054. async.eachLimit(
  1055. socketIds,
  1056. 1,
  1057. (socketId, next) => {
  1058. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this)
  1059. .then(socket => {
  1060. if (socket) sockets.push(socket);
  1061. next();
  1062. })
  1063. .catch(err => {
  1064. reject(err);
  1065. });
  1066. },
  1067. err => {
  1068. if (err) reject(err);
  1069. else {
  1070. let socketsThatCan = [];
  1071. const socketsThatCannot = [];
  1072. if (payload.station.privacy === "public") {
  1073. socketsThatCan = sockets;
  1074. resolve({ socketsThatCan, socketsThatCannot });
  1075. } else {
  1076. async.eachLimit(
  1077. sockets,
  1078. 1,
  1079. (socket, next) => {
  1080. const { session } = socket;
  1081. async.waterfall(
  1082. [
  1083. next => {
  1084. if (!session.sessionId) next("No session id");
  1085. else next();
  1086. },
  1087. next => {
  1088. CacheModule.runJob(
  1089. "HGET",
  1090. {
  1091. table: "sessions",
  1092. key: session.sessionId
  1093. },
  1094. this
  1095. )
  1096. .then(response => {
  1097. next(null, response);
  1098. })
  1099. .catch(next);
  1100. },
  1101. (session, next) => {
  1102. if (!session) next("No session");
  1103. else {
  1104. DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
  1105. .then(userModel => {
  1106. next(null, userModel);
  1107. })
  1108. .catch(next);
  1109. }
  1110. },
  1111. (userModel, next) => {
  1112. if (!userModel) next("No user model");
  1113. else
  1114. userModel.findOne(
  1115. {
  1116. _id: session.userId
  1117. },
  1118. next
  1119. );
  1120. },
  1121. (user, next) => {
  1122. if (!user) next("No user found");
  1123. else if (user.role === "admin") {
  1124. socketsThatCan.push(socket);
  1125. next();
  1126. } else if (
  1127. payload.station.type === "community" &&
  1128. payload.station.owner === session.userId
  1129. ) {
  1130. socketsThatCan.push(socket);
  1131. next();
  1132. }
  1133. }
  1134. ],
  1135. err => {
  1136. if (err) socketsThatCannot.push(socket);
  1137. next();
  1138. }
  1139. );
  1140. },
  1141. err => {
  1142. if (err) reject(err);
  1143. else resolve({ socketsThatCan, socketsThatCannot });
  1144. }
  1145. );
  1146. }
  1147. }
  1148. }
  1149. );
  1150. })
  1151. .catch(reject);
  1152. });
  1153. }
  1154. /**
  1155. * Adds a playlist to be included in a station
  1156. *
  1157. * @param {object} payload - object that contains the payload
  1158. * @param {object} payload.stationId - the id of the station to include the playlist in
  1159. * @param {object} payload.playlistId - the id of the playlist to be included
  1160. * @returns {Promise} - returns a promise (resolve, reject)
  1161. */
  1162. INCLUDE_PLAYLIST(payload) {
  1163. return new Promise((resolve, reject) => {
  1164. async.waterfall(
  1165. [
  1166. next => {
  1167. if (!payload.stationId) next("Please specify a station id");
  1168. else if (!payload.playlistId) next("Please specify a playlist id");
  1169. else next();
  1170. },
  1171. next => {
  1172. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1173. .then(station => {
  1174. next(null, station);
  1175. })
  1176. .catch(next);
  1177. },
  1178. (station, next) => {
  1179. if (station.playlist === payload.playlistId) next("You cannot include the station playlist");
  1180. else if (station.includedPlaylists.indexOf(payload.playlistId) !== -1)
  1181. next("This playlist is already included");
  1182. else if (station.excludedPlaylists.indexOf(payload.playlistId) !== -1)
  1183. next(
  1184. "This playlist is currently excluded, please remove it from there before including it"
  1185. );
  1186. else
  1187. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId: payload.playlistId }, this)
  1188. .then(() => {
  1189. next(null);
  1190. })
  1191. .catch(next);
  1192. },
  1193. next => {
  1194. DBModule.runJob(
  1195. "GET_MODEL",
  1196. {
  1197. modelName: "station"
  1198. },
  1199. this
  1200. ).then(stationModel => {
  1201. stationModel.updateOne(
  1202. { _id: payload.stationId },
  1203. { $push: { includedPlaylists: payload.playlistId } },
  1204. next
  1205. );
  1206. });
  1207. },
  1208. (res, next) => {
  1209. StationsModule.runJob(
  1210. "UPDATE_STATION",
  1211. {
  1212. stationId: payload.stationId
  1213. },
  1214. this
  1215. )
  1216. .then(() => {
  1217. next();
  1218. })
  1219. .catch(next);
  1220. }
  1221. ],
  1222. async err => {
  1223. if (err && err !== true) {
  1224. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1225. return reject(new Error(err));
  1226. }
  1227. return resolve();
  1228. }
  1229. );
  1230. });
  1231. }
  1232. /**
  1233. * Removes a playlist that is included in a station
  1234. *
  1235. * @param {object} payload - object that contains the payload
  1236. * @param {object} payload.stationId - the id of the station
  1237. * @param {object} payload.playlistId - the id of the playlist
  1238. * @returns {Promise} - returns a promise (resolve, reject)
  1239. */
  1240. REMOVE_INCLUDED_PLAYLIST(payload) {
  1241. return new Promise((resolve, reject) => {
  1242. async.waterfall(
  1243. [
  1244. next => {
  1245. if (!payload.stationId) next("Please specify a station id");
  1246. else if (!payload.playlistId) next("Please specify a playlist id");
  1247. else next();
  1248. },
  1249. next => {
  1250. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1251. .then(station => {
  1252. next(null, station);
  1253. })
  1254. .catch(next);
  1255. },
  1256. (station, next) => {
  1257. if (station.includedPlaylists.indexOf(payload.playlistId) === -1)
  1258. next("This playlist isn't included");
  1259. else next();
  1260. },
  1261. next => {
  1262. DBModule.runJob(
  1263. "GET_MODEL",
  1264. {
  1265. modelName: "station"
  1266. },
  1267. this
  1268. ).then(stationModel => {
  1269. stationModel.updateOne(
  1270. { _id: payload.stationId },
  1271. { $pull: { includedPlaylists: payload.playlistId } },
  1272. next
  1273. );
  1274. });
  1275. },
  1276. (res, next) => {
  1277. StationsModule.runJob(
  1278. "UPDATE_STATION",
  1279. {
  1280. stationId: payload.stationId
  1281. },
  1282. this
  1283. )
  1284. .then(() => {
  1285. next();
  1286. })
  1287. .catch(next);
  1288. }
  1289. ],
  1290. async err => {
  1291. if (err && err !== true) {
  1292. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1293. return reject(new Error(err));
  1294. }
  1295. return resolve();
  1296. }
  1297. );
  1298. });
  1299. }
  1300. /**
  1301. * Adds a playlist to be excluded in a station
  1302. *
  1303. * @param {object} payload - object that contains the payload
  1304. * @param {object} payload.stationId - the id of the station
  1305. * @param {object} payload.playlistId - the id of the playlist
  1306. * @returns {Promise} - returns a promise (resolve, reject)
  1307. */
  1308. EXCLUDE_PLAYLIST(payload) {
  1309. return new Promise((resolve, reject) => {
  1310. async.waterfall(
  1311. [
  1312. next => {
  1313. if (!payload.stationId) next("Please specify a station id");
  1314. else if (!payload.playlistId) next("Please specify a playlist id");
  1315. else next();
  1316. },
  1317. next => {
  1318. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1319. .then(station => {
  1320. next(null, station);
  1321. })
  1322. .catch(next);
  1323. },
  1324. (station, next) => {
  1325. if (station.playlist === payload.playlistId) next("You cannot exclude the station playlist");
  1326. else if (station.excludedPlaylists.indexOf(payload.playlistId) !== -1)
  1327. next("This playlist is already excluded");
  1328. else if (station.includedPlaylists.indexOf(payload.playlistId) !== -1)
  1329. next(
  1330. "This playlist is currently included, please remove it from there before excluding it"
  1331. );
  1332. else
  1333. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId: payload.playlistId }, this)
  1334. .then(() => {
  1335. next(null);
  1336. })
  1337. .catch(next);
  1338. },
  1339. next => {
  1340. DBModule.runJob(
  1341. "GET_MODEL",
  1342. {
  1343. modelName: "station"
  1344. },
  1345. this
  1346. ).then(stationModel => {
  1347. stationModel.updateOne(
  1348. { _id: payload.stationId },
  1349. { $push: { excludedPlaylists: payload.playlistId } },
  1350. next
  1351. );
  1352. });
  1353. },
  1354. (res, next) => {
  1355. StationsModule.runJob(
  1356. "UPDATE_STATION",
  1357. {
  1358. stationId: payload.stationId
  1359. },
  1360. this
  1361. )
  1362. .then(() => {
  1363. next();
  1364. })
  1365. .catch(next);
  1366. }
  1367. ],
  1368. async err => {
  1369. if (err && err !== true) {
  1370. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1371. return reject(new Error(err));
  1372. }
  1373. return resolve();
  1374. }
  1375. );
  1376. });
  1377. }
  1378. /**
  1379. * Removes a playlist that is excluded in a station
  1380. *
  1381. * @param {object} payload - object that contains the payload
  1382. * @param {object} payload.stationId - the id of the station
  1383. * @param {object} payload.playlistId - the id of the playlist
  1384. * @returns {Promise} - returns a promise (resolve, reject)
  1385. */
  1386. REMOVE_EXCLUDED_PLAYLIST(payload) {
  1387. return new Promise((resolve, reject) => {
  1388. async.waterfall(
  1389. [
  1390. next => {
  1391. if (!payload.stationId) next("Please specify a station id");
  1392. else if (!payload.playlistId) next("Please specify a playlist id");
  1393. else next();
  1394. },
  1395. next => {
  1396. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1397. .then(station => {
  1398. next(null, station);
  1399. })
  1400. .catch(next);
  1401. },
  1402. (station, next) => {
  1403. if (station.excludedPlaylists.indexOf(payload.playlistId) === -1)
  1404. next("This playlist isn't excluded");
  1405. else next();
  1406. },
  1407. next => {
  1408. DBModule.runJob(
  1409. "GET_MODEL",
  1410. {
  1411. modelName: "station"
  1412. },
  1413. this
  1414. ).then(stationModel => {
  1415. stationModel.updateOne(
  1416. { _id: payload.stationId },
  1417. { $pull: { excludedPlaylists: payload.playlistId } },
  1418. next
  1419. );
  1420. });
  1421. },
  1422. (res, next) => {
  1423. StationsModule.runJob(
  1424. "UPDATE_STATION",
  1425. {
  1426. stationId: payload.stationId
  1427. },
  1428. this
  1429. )
  1430. .then(() => {
  1431. next();
  1432. })
  1433. .catch(next);
  1434. }
  1435. ],
  1436. async err => {
  1437. if (err && err !== true) {
  1438. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1439. return reject(new Error(err));
  1440. }
  1441. return resolve();
  1442. }
  1443. );
  1444. });
  1445. }
  1446. /**
  1447. * Removes included or excluded playlist from a station
  1448. *
  1449. * @param {object} payload - object that contains the payload
  1450. * @param {string} payload.playlistId - the playlist id
  1451. * @returns {Promise} - returns promise (reject, resolve)
  1452. */
  1453. REMOVE_INCLUDED_OR_EXCLUDED_PLAYLIST_FROM_STATIONS(payload) {
  1454. return new Promise((resolve, reject) => {
  1455. async.waterfall(
  1456. [
  1457. next => {
  1458. if (!payload.playlistId) next("Please specify a playlist id");
  1459. else next();
  1460. },
  1461. next => {
  1462. StationsModule.stationModel.updateMany(
  1463. {
  1464. $or: [
  1465. { includedPlaylists: payload.playlistId },
  1466. { excludedPlaylists: payload.playlistId }
  1467. ]
  1468. },
  1469. {
  1470. $pull: {
  1471. includedPlaylists: payload.playlistId,
  1472. excludedPlaylists: payload.playlistId
  1473. }
  1474. },
  1475. err => {
  1476. if (err) next(err);
  1477. else next();
  1478. }
  1479. );
  1480. }
  1481. ],
  1482. async err => {
  1483. if (err && err !== true) {
  1484. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1485. return reject(new Error(err));
  1486. }
  1487. return resolve();
  1488. }
  1489. );
  1490. });
  1491. }
  1492. /**
  1493. * Gets stations that include or exclude a specific playlist
  1494. *
  1495. * @param {object} payload - object that contains the payload
  1496. * @param {string} payload.playlistId - the playlist id
  1497. * @returns {Promise} - returns promise (reject, resolve)
  1498. */
  1499. GET_STATIONS_THAT_INCLUDE_OR_EXCLUDE_PLAYLIST(payload) {
  1500. return new Promise((resolve, reject) => {
  1501. DBModule.runJob(
  1502. "GET_MODEL",
  1503. {
  1504. modelName: "station"
  1505. },
  1506. this
  1507. ).then(stationModel => {
  1508. stationModel.find(
  1509. {
  1510. $or: [{ includedPlaylists: payload.playlistId }, { excludedPlaylists: payload.playlistId }]
  1511. },
  1512. (err, stations) => {
  1513. if (err) reject(err);
  1514. else resolve({ stationIds: stations.map(station => station._id) });
  1515. }
  1516. );
  1517. });
  1518. });
  1519. }
  1520. /**
  1521. * Clears every queue
  1522. *
  1523. * @returns {Promise} - returns a promise (resolve, reject)
  1524. */
  1525. CLEAR_EVERY_STATION_QUEUE() {
  1526. return new Promise((resolve, reject) => {
  1527. async.waterfall(
  1528. [
  1529. next => {
  1530. StationsModule.stationModel.updateMany({}, { $set: { queue: [] } }, err => {
  1531. if (err) next(err);
  1532. else {
  1533. StationsModule.stationModel.find({}, (err, stations) => {
  1534. if (err) next(err);
  1535. else {
  1536. async.eachLimit(
  1537. stations,
  1538. 1,
  1539. (station, next) => {
  1540. StationsModule.runJob("UPDATE_STATION", {
  1541. stationId: station._id
  1542. })
  1543. .then(() => next())
  1544. .catch(next);
  1545. CacheModule.runJob("PUB", {
  1546. channel: "station.queueUpdate",
  1547. value: station._id
  1548. })
  1549. .then()
  1550. .catch();
  1551. },
  1552. next
  1553. );
  1554. }
  1555. });
  1556. }
  1557. });
  1558. }
  1559. ],
  1560. err => {
  1561. if (err) reject(err);
  1562. else resolve();
  1563. }
  1564. );
  1565. });
  1566. }
  1567. /**
  1568. * Clears and refills a station queue
  1569. *
  1570. * @param {object} payload - object that contains the payload
  1571. * @param {string} payload.stationId - the station id
  1572. * @returns {Promise} - returns a promise (resolve, reject)
  1573. */
  1574. CLEAR_AND_REFILL_STATION_QUEUE(payload) {
  1575. return new Promise((resolve, reject) => {
  1576. async.waterfall(
  1577. [
  1578. next => {
  1579. StationsModule.runJob(
  1580. "FILL_UP_STATION_QUEUE_FROM_STATION_PLAYLIST",
  1581. { stationId: payload.stationId, ignoreExistingQueue: true },
  1582. this
  1583. )
  1584. .then(() => {
  1585. CacheModule.runJob("PUB", {
  1586. channel: "station.queueUpdate",
  1587. value: payload.stationId
  1588. })
  1589. .then()
  1590. .catch();
  1591. next();
  1592. })
  1593. .catch(err => {
  1594. next(err);
  1595. });
  1596. }
  1597. ],
  1598. err => {
  1599. if (err) reject(err);
  1600. else resolve();
  1601. }
  1602. );
  1603. });
  1604. }
  1605. }
  1606. export default new _StationsModule();