stations.js 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let StationsModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let WSModule;
  8. let SongsModule;
  9. let PlaylistsModule;
  10. let NotificationsModule;
  11. class _StationsModule extends CoreClass {
  12. // eslint-disable-next-line require-jsdoc
  13. constructor() {
  14. super("stations");
  15. StationsModule = this;
  16. }
  17. /**
  18. * Initialises the stations module
  19. *
  20. * @returns {Promise} - returns promise (reject, resolve)
  21. */
  22. async initialize() {
  23. CacheModule = this.moduleManager.modules.cache;
  24. DBModule = this.moduleManager.modules.db;
  25. UtilsModule = this.moduleManager.modules.utils;
  26. WSModule = this.moduleManager.modules.ws;
  27. SongsModule = this.moduleManager.modules.songs;
  28. PlaylistsModule = this.moduleManager.modules.playlists;
  29. NotificationsModule = this.moduleManager.modules.notifications;
  30. this.userList = {};
  31. this.usersPerStation = {};
  32. this.usersPerStationCount = {};
  33. // TEMP
  34. CacheModule.runJob("SUB", {
  35. channel: "station.pause",
  36. cb: async stationId => {
  37. NotificationsModule.runJob("REMOVE", {
  38. subscription: `stations.nextSong?id=${stationId}`
  39. }).then();
  40. }
  41. });
  42. CacheModule.runJob("SUB", {
  43. channel: "station.resume",
  44. cb: async stationId => {
  45. StationsModule.runJob("INITIALIZE_STATION", { stationId }).then();
  46. }
  47. });
  48. CacheModule.runJob("SUB", {
  49. channel: "station.queueUpdate",
  50. cb: async stationId => {
  51. StationsModule.runJob("GET_STATION", { stationId }).then(station => {
  52. if (!station.currentSong && station.queue.length > 0) {
  53. StationsModule.runJob("INITIALIZE_STATION", {
  54. stationId
  55. }).then();
  56. }
  57. });
  58. }
  59. });
  60. CacheModule.runJob("SUB", {
  61. channel: "station.newOfficialPlaylist",
  62. cb: async stationId => {
  63. CacheModule.runJob("HGET", {
  64. table: "officialPlaylists",
  65. key: stationId
  66. }).then(playlistObj => {
  67. if (playlistObj) {
  68. WSModule.runJob("EMIT_TO_ROOM", {
  69. room: `station.${stationId}`,
  70. args: ["event:newOfficialPlaylist", { data: { playlist: playlistObj.songs } }]
  71. });
  72. }
  73. });
  74. }
  75. });
  76. const stationModel = (this.stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }));
  77. const stationSchema = (this.stationSchema = await CacheModule.runJob("GET_SCHEMA", { schemaName: "station" }));
  78. return new Promise((resolve, reject) =>
  79. async.waterfall(
  80. [
  81. next => {
  82. this.setStage(2);
  83. CacheModule.runJob("HGETALL", { table: "stations" })
  84. .then(stations => {
  85. next(null, stations);
  86. })
  87. .catch(next);
  88. },
  89. (stations, next) => {
  90. this.setStage(3);
  91. if (!stations) return next();
  92. const stationIds = Object.keys(stations);
  93. return async.each(
  94. stationIds,
  95. (stationId, next) => {
  96. stationModel.findOne({ _id: stationId }, (err, station) => {
  97. if (err) next(err);
  98. else if (!station) {
  99. CacheModule.runJob("HDEL", {
  100. table: "stations",
  101. key: stationId
  102. })
  103. .then(() => {
  104. next();
  105. })
  106. .catch(next);
  107. } else next();
  108. });
  109. },
  110. next
  111. );
  112. },
  113. next => {
  114. this.setStage(4);
  115. stationModel.find({}, next);
  116. },
  117. (stations, next) => {
  118. this.setStage(5);
  119. async.each(
  120. stations,
  121. (station, next2) => {
  122. async.waterfall(
  123. [
  124. next => {
  125. CacheModule.runJob("HSET", {
  126. table: "stations",
  127. key: station._id,
  128. value: stationSchema(station)
  129. })
  130. .then(station => next(null, station))
  131. .catch(next);
  132. },
  133. (station, next) => {
  134. StationsModule.runJob(
  135. "INITIALIZE_STATION",
  136. {
  137. stationId: station._id
  138. },
  139. null,
  140. -1
  141. )
  142. .then(() => {
  143. next();
  144. })
  145. .catch(next);
  146. }
  147. ],
  148. err => {
  149. next2(err);
  150. }
  151. );
  152. },
  153. next
  154. );
  155. }
  156. ],
  157. async err => {
  158. if (err) {
  159. err = await UtilsModule.runJob("GET_ERROR", {
  160. error: err
  161. });
  162. reject(new Error(err));
  163. } else {
  164. resolve();
  165. }
  166. }
  167. )
  168. );
  169. }
  170. /**
  171. * Initialises a station
  172. *
  173. * @param {object} payload - object that contains the payload
  174. * @param {string} payload.stationId - id of the station to initialise
  175. * @returns {Promise} - returns a promise (resolve, reject)
  176. */
  177. INITIALIZE_STATION(payload) {
  178. return new Promise((resolve, reject) => {
  179. async.waterfall(
  180. [
  181. next => {
  182. StationsModule.runJob(
  183. "GET_STATION",
  184. {
  185. stationId: payload.stationId
  186. },
  187. this
  188. )
  189. .then(station => {
  190. next(null, station);
  191. })
  192. .catch(next);
  193. },
  194. (station, next) => {
  195. if (!station) return next("Station not found.");
  196. return NotificationsModule.runJob(
  197. "UNSCHEDULE",
  198. {
  199. name: `stations.nextSong?id=${station._id}`
  200. },
  201. this
  202. )
  203. .then()
  204. .catch()
  205. .finally(() => {
  206. NotificationsModule.runJob("SUBSCRIBE", {
  207. name: `stations.nextSong?id=${station._id}`,
  208. cb: () =>
  209. StationsModule.runJob("SKIP_STATION", {
  210. stationId: station._id,
  211. natural: true
  212. }),
  213. unique: true,
  214. station
  215. })
  216. .then()
  217. .catch();
  218. if (station.paused) return next(true, station);
  219. return next(null, station);
  220. });
  221. },
  222. (station, next) => {
  223. if (!station.currentSong) {
  224. return StationsModule.runJob(
  225. "SKIP_STATION",
  226. {
  227. stationId: station._id,
  228. natural: false
  229. },
  230. this
  231. )
  232. .then(station => {
  233. next(true, station);
  234. })
  235. .catch(next)
  236. .finally(() => {});
  237. }
  238. let timeLeft =
  239. station.currentSong.duration * 1000 - (Date.now() - station.startedAt - station.timePaused);
  240. if (Number.isNaN(timeLeft)) timeLeft = -1;
  241. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  242. return StationsModule.runJob(
  243. "SKIP_STATION",
  244. {
  245. stationId: station._id,
  246. natural: false
  247. },
  248. this
  249. )
  250. .then(station => {
  251. next(null, station);
  252. })
  253. .catch(next);
  254. }
  255. // name, time, cb, station
  256. NotificationsModule.runJob("SCHEDULE", {
  257. name: `stations.nextSong?id=${station._id}`,
  258. time: timeLeft,
  259. station
  260. });
  261. return next(null, station);
  262. }
  263. ],
  264. async (err, station) => {
  265. if (err && err !== true) {
  266. err = await UtilsModule.runJob(
  267. "GET_ERROR",
  268. {
  269. error: err
  270. },
  271. this
  272. );
  273. reject(new Error(err));
  274. } else resolve(station);
  275. }
  276. );
  277. });
  278. }
  279. /**
  280. * Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  281. *
  282. * @param {object} payload - object that contains the payload
  283. * @param {string} payload.stationId - id of the station
  284. * @returns {Promise} - returns a promise (resolve, reject)
  285. */
  286. GET_STATION(payload) {
  287. return new Promise((resolve, reject) => {
  288. async.waterfall(
  289. [
  290. next => {
  291. CacheModule.runJob("HGET", { table: "stations", key: payload.stationId }, this)
  292. .then(station => next(null, station))
  293. .catch(next);
  294. },
  295. (station, next) => {
  296. if (station) return next(true, station);
  297. return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  298. },
  299. (station, next) => {
  300. if (station) {
  301. station = StationsModule.stationSchema(station);
  302. CacheModule.runJob("HSET", {
  303. table: "stations",
  304. key: payload.stationId,
  305. value: station
  306. })
  307. .then()
  308. .catch();
  309. next(true, station);
  310. } else next("Station not found");
  311. }
  312. ],
  313. async (err, station) => {
  314. if (err && err !== true) {
  315. err = await UtilsModule.runJob(
  316. "GET_ERROR",
  317. {
  318. error: err
  319. },
  320. this
  321. );
  322. reject(new Error(err));
  323. } else resolve(station);
  324. }
  325. );
  326. });
  327. }
  328. /**
  329. * Attempts to get a station by name, firstly from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  330. *
  331. * @param {object} payload - object that contains the payload
  332. * @param {string} payload.stationName - the unique name of the station
  333. * @returns {Promise} - returns a promise (resolve, reject)
  334. */
  335. async GET_STATION_BY_NAME(payload) {
  336. return new Promise((resolve, reject) =>
  337. async.waterfall(
  338. [
  339. next => {
  340. StationsModule.stationModel.findOne({ name: payload.stationName }, next);
  341. },
  342. (station, next) => {
  343. if (station) {
  344. station = StationsModule.stationSchema(station);
  345. CacheModule.runJob("HSET", {
  346. table: "stations",
  347. key: station._id,
  348. value: station
  349. });
  350. next(true, station);
  351. } else next("Station not found");
  352. }
  353. ],
  354. (err, station) => {
  355. if (err && err !== true) return reject(new Error(err));
  356. return resolve(station);
  357. }
  358. )
  359. );
  360. }
  361. /**
  362. * Gets stations data
  363. *
  364. * @param {object} payload - object containing the payload
  365. * @param {string} payload.page - the page
  366. * @param {string} payload.pageSize - the page size
  367. * @param {string} payload.properties - the properties to return for each station
  368. * @param {string} payload.sort - the sort object
  369. * @param {string} payload.queries - the queries array
  370. * @param {string} payload.operator - the operator for queries
  371. * @returns {Promise} - returns a promise (resolve, reject)
  372. */
  373. GET_DATA(payload) {
  374. return new Promise((resolve, reject) => {
  375. async.waterfall(
  376. [
  377. next => {
  378. const { queries, operator } = payload;
  379. const newQueries = queries.map(query => {
  380. const { data, filter, filterType } = query;
  381. const newQuery = {};
  382. if (filterType === "regex") {
  383. newQuery[filter.property] = new RegExp(`${data.slice(1, data.length - 1)}`, "i");
  384. } else if (filterType === "contains") {
  385. newQuery[filter.property] = new RegExp(
  386. `${data.replaceAll(/[.*+?^${}()|[\]\\]/g, "\\$&")}`,
  387. "i"
  388. );
  389. } else if (filterType === "exact") {
  390. newQuery[filter.property] = data.toString();
  391. } else if (filterType === "datetimeBefore") {
  392. newQuery[filter.property] = { $lte: new Date(data) };
  393. } else if (filterType === "datetimeAfter") {
  394. newQuery[filter.property] = { $gte: new Date(data) };
  395. }
  396. return newQuery;
  397. });
  398. const queryObject = {};
  399. if (newQueries.length > 0) {
  400. if (operator === "and") queryObject.$and = newQueries;
  401. else if (operator === "or") queryObject.$or = newQueries;
  402. else if (operator === "nor") queryObject.$nor = newQueries;
  403. }
  404. next(null, queryObject);
  405. },
  406. (queryObject, next) => {
  407. StationsModule.stationModel.find(queryObject).count((err, count) => {
  408. next(err, queryObject, count);
  409. });
  410. },
  411. (queryObject, count, next) => {
  412. const { page, pageSize, properties, sort } = payload;
  413. StationsModule.stationModel
  414. .find(queryObject)
  415. .sort(sort)
  416. .skip(pageSize * (page - 1))
  417. .limit(pageSize)
  418. .select(properties.join(" "))
  419. .exec((err, stations) => {
  420. next(err, count, stations);
  421. });
  422. }
  423. ],
  424. (err, count, stations) => {
  425. if (err && err !== true) return reject(new Error(err));
  426. return resolve({ data: stations, count });
  427. }
  428. );
  429. });
  430. }
  431. /**
  432. * Updates the station in cache from mongo or deletes station in cache if no longer in mongo.
  433. *
  434. * @param {object} payload - object that contains the payload
  435. * @param {string} payload.stationId - the id of the station to update
  436. * @returns {Promise} - returns a promise (resolve, reject)
  437. */
  438. UPDATE_STATION(payload) {
  439. return new Promise((resolve, reject) => {
  440. async.waterfall(
  441. [
  442. next => {
  443. StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
  444. },
  445. (station, next) => {
  446. if (!station) {
  447. CacheModule.runJob("HDEL", {
  448. table: "stations",
  449. key: payload.stationId
  450. })
  451. .then()
  452. .catch();
  453. return next("Station not found");
  454. }
  455. return CacheModule.runJob(
  456. "HSET",
  457. {
  458. table: "stations",
  459. key: payload.stationId,
  460. value: station
  461. },
  462. this
  463. )
  464. .then(station => {
  465. next(null, station);
  466. })
  467. .catch(next);
  468. }
  469. ],
  470. async (err, station) => {
  471. if (err && err !== true) {
  472. err = await UtilsModule.runJob(
  473. "GET_ERROR",
  474. {
  475. error: err
  476. },
  477. this
  478. );
  479. reject(new Error(err));
  480. } else resolve(station);
  481. }
  482. );
  483. });
  484. }
  485. /**
  486. * Fills up the official station playlist queue using the songs from the official station playlist
  487. *
  488. * @param {object} payload - object that contains the payload
  489. * @param {string} payload.stationId - the id of the station
  490. * @param {string} payload.ignoreExistingQueue - ignore the existing queue songs, replacing the old queue with a completely fresh one
  491. * @returns {Promise} - returns a promise (resolve, reject)
  492. */
  493. FILL_UP_STATION_QUEUE_FROM_STATION_PLAYLIST(payload) {
  494. return new Promise((resolve, reject) => {
  495. const { stationId, ignoreExistingQueue } = payload;
  496. async.waterfall(
  497. [
  498. next => {
  499. PlaylistsModule.runJob("GET_STATION_PLAYLIST", { stationId, includeSongs: true }, this)
  500. .then(response => {
  501. next(null, response.playlist);
  502. })
  503. .catch(next);
  504. },
  505. (playlist, next) => {
  506. StationsModule.runJob("GET_STATION", { stationId }, this)
  507. .then(station => {
  508. if (ignoreExistingQueue) station.queue = [];
  509. next(null, playlist, station);
  510. })
  511. .catch(next);
  512. },
  513. (playlist, station, next) => {
  514. if (station.playMode === "random") {
  515. UtilsModule.runJob("SHUFFLE", { array: playlist.songs }, this)
  516. .then(response => {
  517. next(null, response.array, station);
  518. })
  519. .catch(next);
  520. } else next(null, playlist.songs, station);
  521. },
  522. (_playlistSongs, station, next) => {
  523. let playlistSongs = JSON.parse(JSON.stringify(_playlistSongs));
  524. if (station.playMode === "sequential") {
  525. if (station.currentSongIndex <= playlistSongs.length) {
  526. const songsToAddToEnd = playlistSongs.splice(0, station.currentSongIndex);
  527. playlistSongs = [...playlistSongs, ...songsToAddToEnd];
  528. }
  529. }
  530. const songsStillNeeded = 50 - station.queue.length;
  531. const currentSongs = station.queue;
  532. const currentYoutubeIds = station.queue.map(song => song.youtubeId);
  533. const songsToAdd = [];
  534. let lastSongAdded = null;
  535. playlistSongs.every(song => {
  536. if (
  537. songsToAdd.length < songsStillNeeded &&
  538. currentYoutubeIds.indexOf(song.youtubeId) === -1
  539. ) {
  540. lastSongAdded = song;
  541. songsToAdd.push(song);
  542. return true;
  543. }
  544. if (songsToAdd.length >= songsStillNeeded) return false;
  545. return true;
  546. });
  547. let { currentSongIndex } = station;
  548. if (station.playMode === "sequential" && lastSongAdded) {
  549. const indexOfLastSong = _playlistSongs
  550. .map(song => song.youtubeId)
  551. .indexOf(lastSongAdded.youtubeId);
  552. if (indexOfLastSong !== -1) currentSongIndex = indexOfLastSong;
  553. }
  554. next(null, currentSongs, songsToAdd, currentSongIndex);
  555. },
  556. (currentSongs, songsToAdd, currentSongIndex, next) => {
  557. SongsModule.runJob("GET_SONGS", {
  558. songIds: songsToAdd.map(song => song._id),
  559. properties: [
  560. "youtubeId",
  561. "title",
  562. "duration",
  563. "skipDuration",
  564. "artists",
  565. "thumbnail",
  566. "status"
  567. ]
  568. })
  569. .then(response => {
  570. const newSongsToAdd = songsToAdd.map(song =>
  571. response.songs.find(newSong => newSong._id.toString() === song._id.toString())
  572. );
  573. next(null, currentSongs, newSongsToAdd, currentSongIndex);
  574. })
  575. .catch(err => next(err));
  576. },
  577. (currentSongs, songsToAdd, currentSongIndex, next) => {
  578. const newPlaylist = [...currentSongs, ...songsToAdd].map(song => {
  579. if (!song._id) song._id = null;
  580. return song;
  581. });
  582. next(null, newPlaylist, currentSongIndex);
  583. },
  584. (newPlaylist, currentSongIndex, next) => {
  585. StationsModule.stationModel.updateOne(
  586. { _id: stationId },
  587. { $set: { queue: newPlaylist, currentSongIndex } },
  588. { runValidators: true },
  589. err => {
  590. if (err) next(err);
  591. else
  592. StationsModule.runJob(
  593. "UPDATE_STATION",
  594. {
  595. stationId
  596. },
  597. this
  598. )
  599. .then(() => {
  600. next(null);
  601. })
  602. .catch(next);
  603. }
  604. );
  605. }
  606. ],
  607. err => {
  608. if (err) reject(err);
  609. else resolve();
  610. }
  611. );
  612. });
  613. }
  614. /**
  615. * Gets next station song
  616. *
  617. * @param {object} payload - object that contains the payload
  618. * @param {string} payload.stationId - the id of the station
  619. * @returns {Promise} - returns a promise (resolve, reject)
  620. */
  621. GET_NEXT_STATION_SONG(payload) {
  622. return new Promise((resolve, reject) => {
  623. const { stationId } = payload;
  624. async.waterfall(
  625. [
  626. next => {
  627. StationsModule.runJob("GET_STATION", { stationId }, this)
  628. .then(station => {
  629. next(null, station);
  630. })
  631. .catch(next);
  632. },
  633. (station, next) => {
  634. if (station.queue.length === 0) next("No songs available.");
  635. else {
  636. next(null, station.queue[0]);
  637. }
  638. },
  639. (queueSong, next) => {
  640. if (!queueSong._id) next(null, queueSong);
  641. else
  642. SongsModule.runJob("GET_SONG", { songId: queueSong._id }, this)
  643. .then(response => {
  644. const { song } = response;
  645. if (song) {
  646. const newSong = {
  647. _id: song._id,
  648. youtubeId: song.youtubeId,
  649. title: song.title,
  650. artists: song.artists,
  651. duration: song.duration,
  652. skipDuration: song.skipDuration,
  653. thumbnail: song.thumbnail,
  654. requestedAt: queueSong.requestedAt,
  655. requestedBy: queueSong.requestedBy,
  656. likes: song.likes,
  657. dislikes: song.dislikes,
  658. status: song.status
  659. };
  660. return next(null, newSong);
  661. }
  662. return next(null, song);
  663. })
  664. .catch(err => {
  665. next(err);
  666. });
  667. }
  668. ],
  669. (err, song) => {
  670. if (err) console.log(33333, err, payload);
  671. if (err) reject(err);
  672. else resolve({ song });
  673. }
  674. );
  675. });
  676. }
  677. /**
  678. * Removes first station queue song
  679. *
  680. * @param {object} payload - object that contains the payload
  681. * @param {string} payload.stationId - the id of the station
  682. * @returns {Promise} - returns a promise (resolve, reject)
  683. */
  684. REMOVE_FIRST_QUEUE_SONG(payload) {
  685. return new Promise((resolve, reject) => {
  686. const { stationId } = payload;
  687. async.waterfall(
  688. [
  689. next => {
  690. StationsModule.stationModel.updateOne(
  691. { _id: stationId },
  692. { $pop: { queue: -1 } },
  693. { runValidators: true },
  694. err => {
  695. if (err) next(err);
  696. else
  697. StationsModule.runJob(
  698. "UPDATE_STATION",
  699. {
  700. stationId
  701. },
  702. this
  703. )
  704. .then(() => {
  705. next(null);
  706. })
  707. .catch(next);
  708. }
  709. );
  710. }
  711. ],
  712. err => {
  713. if (err) reject(err);
  714. else resolve();
  715. }
  716. );
  717. });
  718. }
  719. /**
  720. * Skips a station
  721. *
  722. * @param {object} payload - object that contains the payload
  723. * @param {string} payload.stationId - the id of the station to skip
  724. * @param {string} payload.natural - whether to skip naturally or forcefully
  725. * @returns {Promise} - returns a promise (resolve, reject)
  726. */
  727. SKIP_STATION(payload) {
  728. return new Promise((resolve, reject) => {
  729. StationsModule.log("INFO", `Skipping station ${payload.stationId}.`);
  730. StationsModule.log("STATION_ISSUE", `SKIP_STATION_CB - Station ID: ${payload.stationId}.`);
  731. async.waterfall(
  732. [
  733. // Clears up any existing timers that would skip the station if the song ends
  734. next => {
  735. NotificationsModule.runJob("UNSCHEDULE", {
  736. name: `stations.nextSong?id=${payload.stationId}`
  737. })
  738. .then(() => {
  739. next();
  740. })
  741. .catch(next);
  742. },
  743. // Gets the station object
  744. next => {
  745. StationsModule.runJob(
  746. "GET_STATION",
  747. {
  748. stationId: payload.stationId
  749. },
  750. this
  751. )
  752. .then(station => next(null, station))
  753. .catch(next);
  754. },
  755. // eslint-disable-next-line consistent-return
  756. (station, next) => {
  757. if (!station) return next("Station not found.");
  758. if (station.type === "community" && station.partyMode && station.queue.length === 0)
  759. return next(null, null, station); // Community station with party mode enabled and no songs in the queue
  760. if (station.type === "community" && station.partyMode && station.queue.length > 0) {
  761. // Community station with party mode enabled and songs in the queue
  762. if (station.paused) return next(null, null, station);
  763. StationsModule.runJob("GET_NEXT_STATION_SONG", { stationId: station._id }, this)
  764. .then(response => {
  765. StationsModule.runJob(
  766. "REMOVE_FIRST_QUEUE_SONG",
  767. { stationId: station._id },
  768. this
  769. ).then(() => {
  770. next(null, response.song, station);
  771. });
  772. })
  773. .catch(err => {
  774. if (err === "No songs available.") next(null, null, station);
  775. else next(err);
  776. });
  777. }
  778. if (station.type === "community" && !station.partyMode) {
  779. StationsModule.runJob(
  780. "FILL_UP_STATION_QUEUE_FROM_STATION_PLAYLIST",
  781. { stationId: station._id },
  782. this
  783. )
  784. .then(() => {
  785. StationsModule.runJob("GET_NEXT_STATION_SONG", { stationId: station._id }, this)
  786. .then(response => {
  787. StationsModule.runJob(
  788. "REMOVE_FIRST_QUEUE_SONG",
  789. { stationId: station._id },
  790. this
  791. ).then(() => {
  792. next(null, response.song, station);
  793. });
  794. })
  795. .catch(err => {
  796. if (err === "No songs available.") next(null, null, station);
  797. else next(err);
  798. });
  799. })
  800. .catch(next);
  801. }
  802. if (station.type === "official") {
  803. StationsModule.runJob(
  804. "FILL_UP_STATION_QUEUE_FROM_STATION_PLAYLIST",
  805. { stationId: station._id },
  806. this
  807. )
  808. .then(() => {
  809. StationsModule.runJob("GET_NEXT_STATION_SONG", { stationId: station._id }, this)
  810. .then(response => {
  811. StationsModule.runJob(
  812. "REMOVE_FIRST_QUEUE_SONG",
  813. { stationId: station._id },
  814. this
  815. )
  816. .then(() => {
  817. next(null, response.song, station);
  818. })
  819. .catch(next);
  820. })
  821. .catch(err => {
  822. if (err === "No songs available.") next(null, null, station);
  823. else next(err);
  824. });
  825. })
  826. .catch(next);
  827. }
  828. },
  829. (song, station, next) => {
  830. const $set = {};
  831. if (song === null) $set.currentSong = null;
  832. else {
  833. $set.currentSong = {
  834. _id: song._id,
  835. youtubeId: song.youtubeId,
  836. title: song.title,
  837. artists: song.artists,
  838. duration: song.duration,
  839. skipDuration: song.skipDuration,
  840. thumbnail: song.thumbnail,
  841. requestedAt: song.requestedAt,
  842. requestedBy: song.requestedBy,
  843. status: song.status
  844. };
  845. }
  846. $set.startedAt = Date.now();
  847. $set.timePaused = 0;
  848. if (station.paused) $set.pausedAt = Date.now();
  849. next(null, $set, song, station);
  850. },
  851. ($set, song, station, next) => {
  852. StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, err => {
  853. if (err) return next(err);
  854. return StationsModule.runJob("UPDATE_STATION", { stationId: station._id }, this)
  855. .then(station => {
  856. CacheModule.runJob("PUB", {
  857. channel: "station.queueUpdate",
  858. value: payload.stationId
  859. })
  860. .then()
  861. .catch();
  862. next(null, station, song);
  863. })
  864. .catch(next);
  865. });
  866. },
  867. (station, song, next) => {
  868. if (station.currentSong !== null && station.currentSong.youtubeId !== undefined) {
  869. station.currentSong.likes = song.likes;
  870. station.currentSong.dislikes = song.dislikes;
  871. station.currentSong.skipVotes = 0;
  872. }
  873. next(null, station);
  874. }
  875. ],
  876. async (err, station) => {
  877. if (err) {
  878. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  879. StationsModule.log("ERROR", `Skipping station "${payload.stationId}" failed. "${err}"`);
  880. return reject(new Error(err));
  881. }
  882. // TODO Pub/Sub this
  883. const { currentSong } = station;
  884. WSModule.runJob("EMIT_TO_ROOM", {
  885. room: `station.${station._id}`,
  886. args: [
  887. "event:station.nextSong",
  888. {
  889. data: {
  890. currentSong,
  891. startedAt: station.startedAt,
  892. paused: station.paused,
  893. timePaused: 0,
  894. natural: payload.natural
  895. }
  896. }
  897. ]
  898. });
  899. WSModule.runJob("EMIT_TO_ROOM", {
  900. room: `manage-station.${station._id}`,
  901. args: ["event:station.nextSong", { data: { stationId: station._id, currentSong } }]
  902. });
  903. if (station.privacy === "public")
  904. WSModule.runJob("EMIT_TO_ROOM", {
  905. room: "home",
  906. args: ["event:station.nextSong", { data: { stationId: station._id, currentSong } }]
  907. });
  908. else {
  909. const sockets = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: "home" }, this);
  910. sockets.forEach(async socketId => {
  911. const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId });
  912. if (!socket) return;
  913. const { session } = socket;
  914. if (session.sessionId) {
  915. CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }).then(
  916. session => {
  917. if (session) {
  918. DBModule.runJob("GET_MODEL", { modelName: "user" }).then(userModel => {
  919. userModel.findOne({ _id: session.userId }, (err, user) => {
  920. if (!err && user) {
  921. if (user.role === "admin")
  922. socket.dispatch("event:station.nextSong", {
  923. data: {
  924. stationId: station._id,
  925. currentSong
  926. }
  927. });
  928. else if (
  929. station.type === "community" &&
  930. station.owner === session.userId
  931. )
  932. socket.dispatch("event:station.nextSong", {
  933. data: {
  934. stationId: station._id,
  935. currentSong
  936. }
  937. });
  938. }
  939. });
  940. });
  941. }
  942. }
  943. );
  944. }
  945. });
  946. }
  947. WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: `station.${station._id}` }).then(sockets => {
  948. if (station.currentSong !== null && station.currentSong.youtubeId !== undefined) {
  949. WSModule.runJob("SOCKETS_JOIN_SONG_ROOM", {
  950. sockets,
  951. room: `song.${station.currentSong.youtubeId}`
  952. });
  953. if (!station.paused) {
  954. NotificationsModule.runJob("SCHEDULE", {
  955. name: `stations.nextSong?id=${station._id}`,
  956. time: station.currentSong.duration * 1000,
  957. station
  958. });
  959. }
  960. } else WSModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", { sockets });
  961. });
  962. return resolve({ station });
  963. }
  964. );
  965. });
  966. }
  967. /**
  968. * Checks if a user can view/access a station
  969. *
  970. * @param {object} payload - object that contains the payload
  971. * @param {object} payload.station - the station object of the station in question
  972. * @param {string} payload.userId - the id of the user in question
  973. * @param {boolean} payload.hideUnlisted - whether the user is allowed to see unlisted stations or not
  974. * @returns {Promise} - returns a promise (resolve, reject)
  975. */
  976. CAN_USER_VIEW_STATION(payload) {
  977. return new Promise((resolve, reject) => {
  978. async.waterfall(
  979. [
  980. next => {
  981. if (payload.station.privacy === "public") return next(true);
  982. if (payload.station.privacy === "unlisted")
  983. if (payload.hideUnlisted === true) return next();
  984. else return next(true);
  985. if (!payload.userId) return next("Not allowed");
  986. return next();
  987. },
  988. next => {
  989. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(userModel => {
  990. userModel.findOne({ _id: payload.userId }, next);
  991. });
  992. },
  993. (user, next) => {
  994. if (!user) return next("Not allowed");
  995. if (user.role === "admin") return next(true);
  996. if (payload.station.type === "official") return next("Not allowed");
  997. if (payload.station.owner === payload.userId) return next(true);
  998. return next("Not allowed");
  999. }
  1000. ],
  1001. async errOrResult => {
  1002. if (errOrResult !== true && errOrResult !== "Not allowed") {
  1003. errOrResult = await UtilsModule.runJob(
  1004. "GET_ERROR",
  1005. {
  1006. error: errOrResult
  1007. },
  1008. this
  1009. );
  1010. reject(new Error(errOrResult));
  1011. } else {
  1012. resolve(errOrResult === true);
  1013. }
  1014. }
  1015. );
  1016. });
  1017. }
  1018. /**
  1019. * Checks if a user has favorited a station or not
  1020. *
  1021. * @param {object} payload - object that contains the payload
  1022. * @param {object} payload.stationId - the id of the station in question
  1023. * @param {string} payload.userId - the id of the user in question
  1024. * @returns {Promise} - returns a promise (resolve, reject)
  1025. */
  1026. HAS_USER_FAVORITED_STATION(payload) {
  1027. return new Promise((resolve, reject) => {
  1028. async.waterfall(
  1029. [
  1030. next => {
  1031. DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(userModel => {
  1032. userModel.findOne({ _id: payload.userId }, next);
  1033. });
  1034. },
  1035. (user, next) => {
  1036. if (!user) return next("User not found.");
  1037. if (user.favoriteStations.indexOf(payload.stationId) !== -1) return next(null, true);
  1038. return next(null, false);
  1039. }
  1040. ],
  1041. async (err, isStationFavorited) => {
  1042. if (err && err !== true) {
  1043. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1044. return reject(new Error(err));
  1045. }
  1046. return resolve(isStationFavorited);
  1047. }
  1048. );
  1049. });
  1050. }
  1051. /**
  1052. * Returns a list of sockets in a room that can and can't know about a station
  1053. *
  1054. * @param {object} payload - the payload object
  1055. * @param {object} payload.station - the station object
  1056. * @param {string} payload.room - the websockets room to get the sockets from
  1057. * @returns {Promise} - returns a promise (resolve, reject)
  1058. */
  1059. GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION(payload) {
  1060. return new Promise((resolve, reject) => {
  1061. WSModule.runJob("GET_SOCKETS_FOR_ROOM", { room: payload.room }, this)
  1062. .then(socketIds => {
  1063. const sockets = [];
  1064. async.eachLimit(
  1065. socketIds,
  1066. 1,
  1067. (socketId, next) => {
  1068. WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this)
  1069. .then(socket => {
  1070. if (socket) sockets.push(socket);
  1071. next();
  1072. })
  1073. .catch(err => {
  1074. reject(err);
  1075. });
  1076. },
  1077. err => {
  1078. if (err) reject(err);
  1079. else {
  1080. let socketsThatCan = [];
  1081. const socketsThatCannot = [];
  1082. if (payload.station.privacy === "public") {
  1083. socketsThatCan = sockets;
  1084. resolve({ socketsThatCan, socketsThatCannot });
  1085. } else {
  1086. async.eachLimit(
  1087. sockets,
  1088. 1,
  1089. (socket, next) => {
  1090. const { session } = socket;
  1091. async.waterfall(
  1092. [
  1093. next => {
  1094. if (!session.sessionId) next("No session id");
  1095. else next();
  1096. },
  1097. next => {
  1098. CacheModule.runJob(
  1099. "HGET",
  1100. {
  1101. table: "sessions",
  1102. key: session.sessionId
  1103. },
  1104. this
  1105. )
  1106. .then(response => {
  1107. next(null, response);
  1108. })
  1109. .catch(next);
  1110. },
  1111. (session, next) => {
  1112. if (!session) next("No session");
  1113. else {
  1114. DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
  1115. .then(userModel => {
  1116. next(null, userModel);
  1117. })
  1118. .catch(next);
  1119. }
  1120. },
  1121. (userModel, next) => {
  1122. if (!userModel) next("No user model");
  1123. else
  1124. userModel.findOne(
  1125. {
  1126. _id: session.userId
  1127. },
  1128. next
  1129. );
  1130. },
  1131. (user, next) => {
  1132. if (!user) next("No user found");
  1133. else if (user.role === "admin") {
  1134. socketsThatCan.push(socket);
  1135. next();
  1136. } else if (
  1137. payload.station.type === "community" &&
  1138. payload.station.owner === session.userId
  1139. ) {
  1140. socketsThatCan.push(socket);
  1141. next();
  1142. }
  1143. }
  1144. ],
  1145. err => {
  1146. if (err) socketsThatCannot.push(socket);
  1147. next();
  1148. }
  1149. );
  1150. },
  1151. err => {
  1152. if (err) reject(err);
  1153. else resolve({ socketsThatCan, socketsThatCannot });
  1154. }
  1155. );
  1156. }
  1157. }
  1158. }
  1159. );
  1160. })
  1161. .catch(reject);
  1162. });
  1163. }
  1164. /**
  1165. * Adds a playlist to be included in a station
  1166. *
  1167. * @param {object} payload - object that contains the payload
  1168. * @param {object} payload.stationId - the id of the station to include the playlist in
  1169. * @param {object} payload.playlistId - the id of the playlist to be included
  1170. * @returns {Promise} - returns a promise (resolve, reject)
  1171. */
  1172. INCLUDE_PLAYLIST(payload) {
  1173. return new Promise((resolve, reject) => {
  1174. async.waterfall(
  1175. [
  1176. next => {
  1177. if (!payload.stationId) next("Please specify a station id");
  1178. else if (!payload.playlistId) next("Please specify a playlist id");
  1179. else next();
  1180. },
  1181. next => {
  1182. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1183. .then(station => {
  1184. next(null, station);
  1185. })
  1186. .catch(next);
  1187. },
  1188. (station, next) => {
  1189. if (station.playlist === payload.playlistId) next("You cannot include the station playlist");
  1190. else if (station.includedPlaylists.indexOf(payload.playlistId) !== -1)
  1191. next("This playlist is already included");
  1192. else if (station.excludedPlaylists.indexOf(payload.playlistId) !== -1)
  1193. next(
  1194. "This playlist is currently excluded, please remove it from there before including it"
  1195. );
  1196. else
  1197. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId: payload.playlistId }, this)
  1198. .then(() => {
  1199. next(null);
  1200. })
  1201. .catch(next);
  1202. },
  1203. next => {
  1204. DBModule.runJob(
  1205. "GET_MODEL",
  1206. {
  1207. modelName: "station"
  1208. },
  1209. this
  1210. ).then(stationModel => {
  1211. stationModel.updateOne(
  1212. { _id: payload.stationId },
  1213. { $push: { includedPlaylists: payload.playlistId } },
  1214. next
  1215. );
  1216. });
  1217. },
  1218. (res, next) => {
  1219. StationsModule.runJob(
  1220. "UPDATE_STATION",
  1221. {
  1222. stationId: payload.stationId
  1223. },
  1224. this
  1225. )
  1226. .then(() => {
  1227. next();
  1228. })
  1229. .catch(next);
  1230. }
  1231. ],
  1232. async err => {
  1233. if (err && err !== true) {
  1234. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1235. return reject(new Error(err));
  1236. }
  1237. return resolve();
  1238. }
  1239. );
  1240. });
  1241. }
  1242. /**
  1243. * Removes a playlist that is included in a station
  1244. *
  1245. * @param {object} payload - object that contains the payload
  1246. * @param {object} payload.stationId - the id of the station
  1247. * @param {object} payload.playlistId - the id of the playlist
  1248. * @returns {Promise} - returns a promise (resolve, reject)
  1249. */
  1250. REMOVE_INCLUDED_PLAYLIST(payload) {
  1251. return new Promise((resolve, reject) => {
  1252. async.waterfall(
  1253. [
  1254. next => {
  1255. if (!payload.stationId) next("Please specify a station id");
  1256. else if (!payload.playlistId) next("Please specify a playlist id");
  1257. else next();
  1258. },
  1259. next => {
  1260. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1261. .then(station => {
  1262. next(null, station);
  1263. })
  1264. .catch(next);
  1265. },
  1266. (station, next) => {
  1267. if (station.includedPlaylists.indexOf(payload.playlistId) === -1)
  1268. next("This playlist isn't included");
  1269. else next();
  1270. },
  1271. next => {
  1272. DBModule.runJob(
  1273. "GET_MODEL",
  1274. {
  1275. modelName: "station"
  1276. },
  1277. this
  1278. ).then(stationModel => {
  1279. stationModel.updateOne(
  1280. { _id: payload.stationId },
  1281. { $pull: { includedPlaylists: payload.playlistId } },
  1282. next
  1283. );
  1284. });
  1285. },
  1286. (res, next) => {
  1287. StationsModule.runJob(
  1288. "UPDATE_STATION",
  1289. {
  1290. stationId: payload.stationId
  1291. },
  1292. this
  1293. )
  1294. .then(() => {
  1295. next();
  1296. })
  1297. .catch(next);
  1298. }
  1299. ],
  1300. async err => {
  1301. if (err && err !== true) {
  1302. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1303. return reject(new Error(err));
  1304. }
  1305. return resolve();
  1306. }
  1307. );
  1308. });
  1309. }
  1310. /**
  1311. * Adds a playlist to be excluded in a station
  1312. *
  1313. * @param {object} payload - object that contains the payload
  1314. * @param {object} payload.stationId - the id of the station
  1315. * @param {object} payload.playlistId - the id of the playlist
  1316. * @returns {Promise} - returns a promise (resolve, reject)
  1317. */
  1318. EXCLUDE_PLAYLIST(payload) {
  1319. return new Promise((resolve, reject) => {
  1320. async.waterfall(
  1321. [
  1322. next => {
  1323. if (!payload.stationId) next("Please specify a station id");
  1324. else if (!payload.playlistId) next("Please specify a playlist id");
  1325. else next();
  1326. },
  1327. next => {
  1328. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1329. .then(station => {
  1330. next(null, station);
  1331. })
  1332. .catch(next);
  1333. },
  1334. (station, next) => {
  1335. if (station.playlist === payload.playlistId) next("You cannot exclude the station playlist");
  1336. else if (station.excludedPlaylists.indexOf(payload.playlistId) !== -1)
  1337. next("This playlist is already excluded");
  1338. else if (station.includedPlaylists.indexOf(payload.playlistId) !== -1)
  1339. next(
  1340. "This playlist is currently included, please remove it from there before excluding it"
  1341. );
  1342. else
  1343. PlaylistsModule.runJob("GET_PLAYLIST", { playlistId: payload.playlistId }, this)
  1344. .then(() => {
  1345. next(null);
  1346. })
  1347. .catch(next);
  1348. },
  1349. next => {
  1350. DBModule.runJob(
  1351. "GET_MODEL",
  1352. {
  1353. modelName: "station"
  1354. },
  1355. this
  1356. ).then(stationModel => {
  1357. stationModel.updateOne(
  1358. { _id: payload.stationId },
  1359. { $push: { excludedPlaylists: payload.playlistId } },
  1360. next
  1361. );
  1362. });
  1363. },
  1364. (res, next) => {
  1365. StationsModule.runJob(
  1366. "UPDATE_STATION",
  1367. {
  1368. stationId: payload.stationId
  1369. },
  1370. this
  1371. )
  1372. .then(() => {
  1373. next();
  1374. })
  1375. .catch(next);
  1376. }
  1377. ],
  1378. async err => {
  1379. if (err && err !== true) {
  1380. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1381. return reject(new Error(err));
  1382. }
  1383. return resolve();
  1384. }
  1385. );
  1386. });
  1387. }
  1388. /**
  1389. * Removes a playlist that is excluded in a station
  1390. *
  1391. * @param {object} payload - object that contains the payload
  1392. * @param {object} payload.stationId - the id of the station
  1393. * @param {object} payload.playlistId - the id of the playlist
  1394. * @returns {Promise} - returns a promise (resolve, reject)
  1395. */
  1396. REMOVE_EXCLUDED_PLAYLIST(payload) {
  1397. return new Promise((resolve, reject) => {
  1398. async.waterfall(
  1399. [
  1400. next => {
  1401. if (!payload.stationId) next("Please specify a station id");
  1402. else if (!payload.playlistId) next("Please specify a playlist id");
  1403. else next();
  1404. },
  1405. next => {
  1406. StationsModule.runJob("GET_STATION", { stationId: payload.stationId }, this)
  1407. .then(station => {
  1408. next(null, station);
  1409. })
  1410. .catch(next);
  1411. },
  1412. (station, next) => {
  1413. if (station.excludedPlaylists.indexOf(payload.playlistId) === -1)
  1414. next("This playlist isn't excluded");
  1415. else next();
  1416. },
  1417. next => {
  1418. DBModule.runJob(
  1419. "GET_MODEL",
  1420. {
  1421. modelName: "station"
  1422. },
  1423. this
  1424. ).then(stationModel => {
  1425. stationModel.updateOne(
  1426. { _id: payload.stationId },
  1427. { $pull: { excludedPlaylists: payload.playlistId } },
  1428. next
  1429. );
  1430. });
  1431. },
  1432. (res, next) => {
  1433. StationsModule.runJob(
  1434. "UPDATE_STATION",
  1435. {
  1436. stationId: payload.stationId
  1437. },
  1438. this
  1439. )
  1440. .then(() => {
  1441. next();
  1442. })
  1443. .catch(next);
  1444. }
  1445. ],
  1446. async err => {
  1447. if (err && err !== true) {
  1448. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1449. return reject(new Error(err));
  1450. }
  1451. return resolve();
  1452. }
  1453. );
  1454. });
  1455. }
  1456. /**
  1457. * Removes included or excluded playlist from a station
  1458. *
  1459. * @param {object} payload - object that contains the payload
  1460. * @param {string} payload.playlistId - the playlist id
  1461. * @returns {Promise} - returns promise (reject, resolve)
  1462. */
  1463. REMOVE_INCLUDED_OR_EXCLUDED_PLAYLIST_FROM_STATIONS(payload) {
  1464. return new Promise((resolve, reject) => {
  1465. async.waterfall(
  1466. [
  1467. next => {
  1468. if (!payload.playlistId) next("Please specify a playlist id");
  1469. else next();
  1470. },
  1471. next => {
  1472. StationsModule.stationModel.updateMany(
  1473. {
  1474. $or: [
  1475. { includedPlaylists: payload.playlistId },
  1476. { excludedPlaylists: payload.playlistId }
  1477. ]
  1478. },
  1479. {
  1480. $pull: {
  1481. includedPlaylists: payload.playlistId,
  1482. excludedPlaylists: payload.playlistId
  1483. }
  1484. },
  1485. err => {
  1486. if (err) next(err);
  1487. else next();
  1488. }
  1489. );
  1490. }
  1491. ],
  1492. async err => {
  1493. if (err && err !== true) {
  1494. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  1495. return reject(new Error(err));
  1496. }
  1497. return resolve();
  1498. }
  1499. );
  1500. });
  1501. }
  1502. /**
  1503. * Gets stations that include or exclude a specific playlist
  1504. *
  1505. * @param {object} payload - object that contains the payload
  1506. * @param {string} payload.playlistId - the playlist id
  1507. * @returns {Promise} - returns promise (reject, resolve)
  1508. */
  1509. GET_STATIONS_THAT_INCLUDE_OR_EXCLUDE_PLAYLIST(payload) {
  1510. return new Promise((resolve, reject) => {
  1511. DBModule.runJob(
  1512. "GET_MODEL",
  1513. {
  1514. modelName: "station"
  1515. },
  1516. this
  1517. ).then(stationModel => {
  1518. stationModel.find(
  1519. {
  1520. $or: [{ includedPlaylists: payload.playlistId }, { excludedPlaylists: payload.playlistId }]
  1521. },
  1522. (err, stations) => {
  1523. if (err) reject(err);
  1524. else resolve({ stationIds: stations.map(station => station._id) });
  1525. }
  1526. );
  1527. });
  1528. });
  1529. }
  1530. /**
  1531. * Clears every queue
  1532. *
  1533. * @returns {Promise} - returns a promise (resolve, reject)
  1534. */
  1535. CLEAR_EVERY_STATION_QUEUE() {
  1536. return new Promise((resolve, reject) => {
  1537. async.waterfall(
  1538. [
  1539. next => {
  1540. StationsModule.stationModel.updateMany({}, { $set: { queue: [] } }, err => {
  1541. if (err) next(err);
  1542. else {
  1543. StationsModule.stationModel.find({}, (err, stations) => {
  1544. if (err) next(err);
  1545. else {
  1546. async.eachLimit(
  1547. stations,
  1548. 1,
  1549. (station, next) => {
  1550. StationsModule.runJob("UPDATE_STATION", {
  1551. stationId: station._id
  1552. })
  1553. .then(() => next())
  1554. .catch(next);
  1555. CacheModule.runJob("PUB", {
  1556. channel: "station.queueUpdate",
  1557. value: station._id
  1558. })
  1559. .then()
  1560. .catch();
  1561. },
  1562. next
  1563. );
  1564. }
  1565. });
  1566. }
  1567. });
  1568. }
  1569. ],
  1570. err => {
  1571. if (err) reject(err);
  1572. else resolve();
  1573. }
  1574. );
  1575. });
  1576. }
  1577. /**
  1578. * Clears and refills a station queue
  1579. *
  1580. * @param {object} payload - object that contains the payload
  1581. * @param {string} payload.stationId - the station id
  1582. * @returns {Promise} - returns a promise (resolve, reject)
  1583. */
  1584. CLEAR_AND_REFILL_STATION_QUEUE(payload) {
  1585. return new Promise((resolve, reject) => {
  1586. async.waterfall(
  1587. [
  1588. next => {
  1589. StationsModule.runJob(
  1590. "FILL_UP_STATION_QUEUE_FROM_STATION_PLAYLIST",
  1591. { stationId: payload.stationId, ignoreExistingQueue: true },
  1592. this
  1593. )
  1594. .then(() => {
  1595. CacheModule.runJob("PUB", {
  1596. channel: "station.queueUpdate",
  1597. value: payload.stationId
  1598. })
  1599. .then()
  1600. .catch();
  1601. next();
  1602. })
  1603. .catch(err => {
  1604. next(err);
  1605. });
  1606. }
  1607. ],
  1608. err => {
  1609. if (err) reject(err);
  1610. else resolve();
  1611. }
  1612. );
  1613. });
  1614. }
  1615. }
  1616. export default new _StationsModule();