stations.js 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218
  1. 'use strict';
  2. const async = require('async'),
  3. request = require('request'),
  4. config = require('config'),
  5. _ = require('underscore')._;
  6. const hooks = require('./hooks');
  7. const moduleManager = require("../../index");
  8. const db = moduleManager.modules["db"];
  9. const cache = moduleManager.modules["cache"];
  10. const notifications = moduleManager.modules["notifications"];
  11. const utils = moduleManager.modules["utils"];
  12. const logger = moduleManager.modules["logger"];
  13. const stations = moduleManager.modules["stations"];
  14. const songs = moduleManager.modules["songs"];
  15. let userList = {};
  16. let usersPerStation = {};
  17. let usersPerStationCount = {};
  18. setInterval(() => {
  19. let stationsCountUpdated = [];
  20. let stationsUpdated = [];
  21. let oldUsersPerStation = usersPerStation;
  22. usersPerStation = {};
  23. let oldUsersPerStationCount = usersPerStationCount;
  24. usersPerStationCount = {};
  25. async.each(Object.keys(userList), function(socketId, next) {
  26. utils.socketFromSession(socketId).then((socket) => {
  27. let stationId = userList[socketId];
  28. if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
  29. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  30. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  31. delete userList[socketId];
  32. return next();
  33. }
  34. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
  35. usersPerStationCount[stationId]++;
  36. if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
  37. async.waterfall([
  38. (next) => {
  39. if (!socket.session || !socket.session.sessionId) return next('No session found.');
  40. cache.hget('sessions', socket.session.sessionId, next);
  41. },
  42. (session, next) => {
  43. if (!session) return next('Session not found.');
  44. db.models.user.findOne({_id: session.userId}, next);
  45. },
  46. (user, next) => {
  47. if (!user) return next('User not found.');
  48. if (usersPerStation[stationId].indexOf(user.username) !== -1) return next('User already in the list.');
  49. next(null, user.username);
  50. }
  51. ], (err, username) => {
  52. if (!err) {
  53. usersPerStation[stationId].push(username);
  54. }
  55. next();
  56. });
  57. });
  58. //TODO Code to show users
  59. }, (err) => {
  60. for (let stationId in usersPerStationCount) {
  61. if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
  62. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  63. }
  64. }
  65. for (let stationId in usersPerStation) {
  66. if (_.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 || _.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0) {
  67. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  68. }
  69. }
  70. stationsCountUpdated.forEach((stationId) => {
  71. //logger.info("UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  72. cache.pub('station.updateUserCount', stationId);
  73. });
  74. stationsUpdated.forEach((stationId) => {
  75. //logger.info("UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  76. cache.pub('station.updateUsers', stationId);
  77. });
  78. //console.log("Userlist", usersPerStation);
  79. });
  80. }, 3000);
  81. cache.sub('station.updateUsers', stationId => {
  82. let list = usersPerStation[stationId] || [];
  83. utils.emitToRoom(`station.${stationId}`, "event:users.updated", list);
  84. });
  85. cache.sub('station.updateUserCount', stationId => {
  86. let count = usersPerStationCount[stationId] || 0;
  87. utils.emitToRoom(`station.${stationId}`, "event:userCount.updated", count);
  88. stations.getStation(stationId, async (err, station) => {
  89. if (station.privacy === 'public') utils.emitToRoom('home', "event:userCount.updated", stationId, count);
  90. else {
  91. let sockets = await utils.getRoomSockets('home');
  92. for (let socketId in sockets) {
  93. let socket = sockets[socketId];
  94. let session = sockets[socketId].session;
  95. if (session.sessionId) {
  96. cache.hget('sessions', session.sessionId, (err, session) => {
  97. if (!err && session) {
  98. db.models.user.findOne({_id: session.userId}, (err, user) => {
  99. if (user.role === 'admin') socket.emit("event:userCount.updated", stationId, count);
  100. else if (station.type === "community" && station.owner === session.userId) socket.emit("event:userCount.updated", stationId, count);
  101. });
  102. }
  103. });
  104. }
  105. }
  106. }
  107. })
  108. });
  109. cache.sub('station.queueLockToggled', data => {
  110. utils.emitToRoom(`station.${data.stationId}`, "event:queueLockToggled", data.locked)
  111. });
  112. cache.sub('station.updatePartyMode', data => {
  113. utils.emitToRoom(`station.${data.stationId}`, "event:partyMode.updated", data.partyMode);
  114. });
  115. cache.sub('privatePlaylist.selected', data => {
  116. utils.emitToRoom(`station.${data.stationId}`, "event:privatePlaylist.selected", data.playlistId);
  117. });
  118. cache.sub('station.pause', stationId => {
  119. stations.getStation(stationId, (err, station) => {
  120. utils.emitToRoom(`station.${stationId}`, "event:stations.pause", { pausedAt: station.pausedAt });
  121. });
  122. });
  123. cache.sub('station.resume', stationId => {
  124. stations.getStation(stationId, (err, station) => {
  125. utils.emitToRoom(`station.${stationId}`, "event:stations.resume", { timePaused: station.timePaused });
  126. });
  127. });
  128. cache.sub('station.queueUpdate', stationId => {
  129. stations.getStation(stationId, (err, station) => {
  130. if (!err) utils.emitToRoom(`station.${stationId}`, "event:queue.update", station.queue);
  131. });
  132. });
  133. cache.sub('station.voteSkipSong', stationId => {
  134. utils.emitToRoom(`station.${stationId}`, "event:song.voteSkipSong");
  135. });
  136. cache.sub('station.remove', stationId => {
  137. utils.emitToRoom(`station.${stationId}`, 'event:stations.remove');
  138. utils.emitToRoom('admin.stations', 'event:admin.station.removed', stationId);
  139. });
  140. cache.sub('station.create', stationId => {
  141. stations.initializeStation(stationId, async (err, station) => {
  142. station.userCount = usersPerStationCount[stationId] || 0;
  143. if (err) console.error(err);
  144. utils.emitToRoom('admin.stations', 'event:admin.station.added', station);
  145. // TODO If community, check if on whitelist
  146. if (station.privacy === 'public') utils.emitToRoom('home', "event:stations.created", station);
  147. else {
  148. let sockets = await utils.getRoomSockets('home');
  149. for (let socketId in sockets) {
  150. let socket = sockets[socketId];
  151. let session = sockets[socketId].session;
  152. if (session.sessionId) {
  153. cache.hget('sessions', session.sessionId, (err, session) => {
  154. if (!err && session) {
  155. db.models.user.findOne({_id: session.userId}, (err, user) => {
  156. if (user.role === 'admin') socket.emit("event:stations.created", station);
  157. else if (station.type === "community" && station.owner === session.userId) socket.emit("event:stations.created", station);
  158. });
  159. }
  160. });
  161. }
  162. }
  163. }
  164. });
  165. });
  166. module.exports = {
  167. /**
  168. * Get a list of all the stations
  169. *
  170. * @param session
  171. * @param cb
  172. * @return {{ status: String, stations: Array }}
  173. */
  174. index: (session, cb) => {
  175. async.waterfall([
  176. (next) => {
  177. cache.hgetall('stations', next);
  178. },
  179. (stations, next) => {
  180. let resultStations = [];
  181. for (let id in stations) {
  182. resultStations.push(stations[id]);
  183. }
  184. next(null, stations);
  185. },
  186. (stationsArray, next) => {
  187. let resultStations = [];
  188. async.each(stationsArray, (station, next) => {
  189. async.waterfall([
  190. (next) => {
  191. stations.canUserViewStation(station, session.userId, (err, exists) => {
  192. next(err, exists);
  193. });
  194. }
  195. ], (err, exists) => {
  196. station.userCount = usersPerStationCount[station._id] || 0;
  197. if (exists) resultStations.push(station);
  198. next();
  199. });
  200. }, () => {
  201. next(null, resultStations);
  202. });
  203. }
  204. ], async (err, stations) => {
  205. if (err) {
  206. err = await utils.getError(err);
  207. logger.error("STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  208. return cb({'status': 'failure', 'message': err});
  209. }
  210. logger.success("STATIONS_INDEX", `Indexing stations successful.`, false);
  211. return cb({'status': 'success', 'stations': stations});
  212. });
  213. },
  214. /**
  215. * Verifies that a station exists
  216. *
  217. * @param session
  218. * @param stationName - the station name
  219. * @param cb
  220. */
  221. existsByName: (session, stationName, cb) => {
  222. async.waterfall([
  223. (next) => {
  224. stations.getStationByName(stationName, next);
  225. },
  226. (station, next) => {
  227. if (!station) return next(null, false);
  228. stations.canUserViewStation(station, session.userId, (err, exists) => {
  229. next(err, exists);
  230. });
  231. }
  232. ], async (err, exists) => {
  233. if (err) {
  234. err = await utils.getError(err);
  235. logger.error("STATION_EXISTS_BY_NAME", `Checking if station "${stationName}" exists failed. "${err}"`);
  236. return cb({'status': 'failure', 'message': err});
  237. }
  238. logger.success("STATION_EXISTS_BY_NAME", `Station "${stationName}" exists successfully.`/*, false*/);
  239. cb({status: 'success', exists});
  240. });
  241. },
  242. /**
  243. * Gets the official playlist for a station
  244. *
  245. * @param session
  246. * @param stationId - the station id
  247. * @param cb
  248. */
  249. getPlaylist: (session, stationId, cb) => {
  250. async.waterfall([
  251. (next) => {
  252. stations.getStation(stationId, next);
  253. },
  254. (station, next) => {
  255. stations.canUserViewStation(station, session.userId, (err, canView) => {
  256. if (err) return next(err);
  257. if (canView) return next(null, station);
  258. return next('Insufficient permissions.');
  259. });
  260. },
  261. (station, next) => {
  262. if (!station) return next('Station not found.');
  263. else if (station.type !== 'official') return next('This is not an official station.');
  264. else next();
  265. },
  266. (next) => {
  267. cache.hget('officialPlaylists', stationId, next);
  268. },
  269. (playlist, next) => {
  270. if (!playlist) return next('Playlist not found.');
  271. next(null, playlist);
  272. }
  273. ], async (err, playlist) => {
  274. if (err) {
  275. err = await utils.getError(err);
  276. logger.error("STATIONS_GET_PLAYLIST", `Getting playlist for station "${stationId}" failed. "${err}"`);
  277. return cb({ status: 'failure', message: err });
  278. } else {
  279. logger.success("STATIONS_GET_PLAYLIST", `Got playlist for station "${stationId}" successfully.`, false);
  280. cb({ status: 'success', data: playlist.songs });
  281. }
  282. });
  283. },
  284. /**
  285. * Joins the station by its name
  286. *
  287. * @param session
  288. * @param stationName - the station name
  289. * @param cb
  290. * @return {{ status: String, userCount: Integer }}
  291. */
  292. join: (session, stationName, cb) => {
  293. async.waterfall([
  294. (next) => {
  295. stations.getStationByName(stationName, next);
  296. },
  297. (station, next) => {
  298. if (!station) return next('Station not found.');
  299. stations.canUserViewStation(station, session.userId, (err, canView) => {
  300. if (err) return next(err);
  301. if (!canView) next("Not allowed to join station.");
  302. else next(null, station);
  303. });
  304. },
  305. (station, next) => {
  306. utils.socketJoinRoom(session.socketId, `station.${station._id}`);
  307. let data = {
  308. _id: station._id,
  309. type: station.type,
  310. currentSong: station.currentSong,
  311. startedAt: station.startedAt,
  312. paused: station.paused,
  313. timePaused: station.timePaused,
  314. pausedAt: station.pausedAt,
  315. description: station.description,
  316. displayName: station.displayName,
  317. privacy: station.privacy,
  318. locked: station.locked,
  319. partyMode: station.partyMode,
  320. owner: station.owner,
  321. privatePlaylist: station.privatePlaylist
  322. };
  323. userList[session.socketId] = station._id;
  324. next(null, data);
  325. },
  326. (data, next) => {
  327. data.userCount = usersPerStationCount[data._id] || 0;
  328. data.users = usersPerStation[data._id] || [];
  329. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  330. utils.socketJoinSongRoom(session.socketId, `song.${data.currentSong.songId}`);
  331. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  332. songs.getSongFromId(data.currentSong.songId, (err, song) => {
  333. if (!err && song) {
  334. data.currentSong.likes = song.likes;
  335. data.currentSong.dislikes = song.dislikes;
  336. } else {
  337. data.currentSong.likes = -1;
  338. data.currentSong.dislikes = -1;
  339. }
  340. next(null, data);
  341. });
  342. }
  343. ], async (err, data) => {
  344. if (err) {
  345. err = await utils.getError(err);
  346. logger.error("STATIONS_JOIN", `Joining station "${stationName}" failed. "${err}"`);
  347. return cb({'status': 'failure', 'message': err});
  348. }
  349. logger.success("STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  350. cb({status: 'success', data});
  351. });
  352. },
  353. /**
  354. * Toggles if a station is locked
  355. *
  356. * @param session
  357. * @param stationId - the station id
  358. * @param cb
  359. */
  360. toggleLock: hooks.ownerRequired((session, stationId, cb) => {
  361. async.waterfall([
  362. (next) => {
  363. stations.getStation(stationId, next);
  364. },
  365. (station, next) => {
  366. db.models.station.updateOne({ _id: stationId }, { $set: { locked: !station.locked} }, next);
  367. },
  368. (res, next) => {
  369. stations.updateStation(stationId, next);
  370. }
  371. ], async (err, station) => {
  372. if (err) {
  373. err = await utils.getError(err);
  374. logger.error("STATIONS_UPDATE_LOCKED_STATUS", `Toggling the queue lock for station "${stationId}" failed. "${err}"`);
  375. return cb({ status: 'failure', message: err });
  376. } else {
  377. logger.success("STATIONS_UPDATE_LOCKED_STATUS", `Toggled the queue lock for station "${stationId}" successfully to "${station.locked}".`);
  378. cache.pub('station.queueLockToggled', {stationId, locked: station.locked});
  379. return cb({ status: 'success', data: station.locked });
  380. }
  381. });
  382. }),
  383. /**
  384. * Votes to skip a station
  385. *
  386. * @param session
  387. * @param stationId - the station id
  388. * @param cb
  389. */
  390. voteSkip: hooks.loginRequired((session, stationId, cb) => {
  391. async.waterfall([
  392. (next) => {
  393. stations.getStation(stationId, next);
  394. },
  395. (station, next) => {
  396. if (!station) return next('Station not found.');
  397. stations.canUserViewStation(station, session.userId, (err, canView) => {
  398. if (err) return next(err);
  399. if (canView) return next(null, station);
  400. return next('Insufficient permissions.');
  401. });
  402. },
  403. (station, next) => {
  404. if (!station.currentSong) return next('There is currently no song to skip.');
  405. if (station.currentSong.skipVotes.indexOf(session.userId) !== -1) return next('You have already voted to skip this song.');
  406. next(null, station);
  407. },
  408. (station, next) => {
  409. db.models.station.updateOne({_id: stationId}, {$push: {"currentSong.skipVotes": session.userId}}, next)
  410. },
  411. (res, next) => {
  412. stations.updateStation(stationId, next);
  413. },
  414. (station, next) => {
  415. if (!station) return next('Station not found.');
  416. next(null, station);
  417. }
  418. ], async (err, station) => {
  419. if (err) {
  420. err = await utils.getError(err);
  421. logger.error("STATIONS_VOTE_SKIP", `Vote skipping station "${stationId}" failed. "${err}"`);
  422. return cb({'status': 'failure', 'message': err});
  423. }
  424. logger.success("STATIONS_VOTE_SKIP", `Vote skipping "${stationId}" successful.`);
  425. cache.pub('station.voteSkipSong', stationId);
  426. if (station.currentSong && station.currentSong.skipVotes.length >= 3) stations.skipStation(stationId)();
  427. cb({ status: 'success', message: 'Successfully voted to skip the song.' });
  428. });
  429. }),
  430. /**
  431. * Force skips a station
  432. *
  433. * @param session
  434. * @param stationId - the station id
  435. * @param cb
  436. */
  437. forceSkip: hooks.ownerRequired((session, stationId, cb) => {
  438. async.waterfall([
  439. (next) => {
  440. stations.getStation(stationId, next);
  441. },
  442. (station, next) => {
  443. if (!station) return next('Station not found.');
  444. next();
  445. }
  446. ], async (err) => {
  447. if (err) {
  448. err = await utils.getError(err);
  449. logger.error("STATIONS_FORCE_SKIP", `Force skipping station "${stationId}" failed. "${err}"`);
  450. return cb({'status': 'failure', 'message': err});
  451. }
  452. notifications.unschedule(`stations.nextSong?id=${stationId}`);
  453. stations.skipStation(stationId)();
  454. logger.success("STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  455. return cb({'status': 'success', 'message': 'Successfully skipped station.'});
  456. });
  457. }),
  458. /**
  459. * Leaves the user's current station
  460. *
  461. * @param session
  462. * @param stationId
  463. * @param cb
  464. * @return {{ status: String, userCount: Integer }}
  465. */
  466. leave: (session, stationId, cb) => {
  467. async.waterfall([
  468. (next) => {
  469. stations.getStation(stationId, next);
  470. },
  471. (station, next) => {
  472. if (!station) return next('Station not found.');
  473. next();
  474. }
  475. ], async (err, userCount) => {
  476. if (err) {
  477. err = await utils.getError(err);
  478. logger.error("STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  479. return cb({'status': 'failure', 'message': err});
  480. }
  481. logger.success("STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  482. utils.socketLeaveRooms(session);
  483. delete userList[session.socketId];
  484. return cb({'status': 'success', 'message': 'Successfully left station.', userCount});
  485. });
  486. },
  487. /**
  488. * Updates a station's name
  489. *
  490. * @param session
  491. * @param stationId - the station id
  492. * @param newName - the new station name
  493. * @param cb
  494. */
  495. updateName: hooks.ownerRequired((session, stationId, newName, cb) => {
  496. async.waterfall([
  497. (next) => {
  498. db.models.station.updateOne({_id: stationId}, {$set: {name: newName}}, {runValidators: true}, next);
  499. },
  500. (res, next) => {
  501. stations.updateStation(stationId, next);
  502. }
  503. ], async (err) => {
  504. if (err) {
  505. err = await utils.getError(err);
  506. logger.error("STATIONS_UPDATE_NAME", `Updating station "${stationId}" name to "${newName}" failed. "${err}"`);
  507. return cb({'status': 'failure', 'message': err});
  508. }
  509. logger.success("STATIONS_UPDATE_NAME", `Updated station "${stationId}" name to "${newName}" successfully.`);
  510. return cb({'status': 'success', 'message': 'Successfully updated the name.'});
  511. });
  512. }),
  513. /**
  514. * Updates a station's display name
  515. *
  516. * @param session
  517. * @param stationId - the station id
  518. * @param newDisplayName - the new station display name
  519. * @param cb
  520. */
  521. updateDisplayName: hooks.ownerRequired((session, stationId, newDisplayName, cb) => {
  522. async.waterfall([
  523. (next) => {
  524. db.models.station.updateOne({_id: stationId}, {$set: {displayName: newDisplayName}}, {runValidators: true}, next);
  525. },
  526. (res, next) => {
  527. stations.updateStation(stationId, next);
  528. }
  529. ], async (err) => {
  530. if (err) {
  531. err = await utils.getError(err);
  532. logger.error("STATIONS_UPDATE_DISPLAY_NAME", `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`);
  533. return cb({'status': 'failure', 'message': err});
  534. }
  535. logger.success("STATIONS_UPDATE_DISPLAY_NAME", `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`);
  536. return cb({'status': 'success', 'message': 'Successfully updated the display name.'});
  537. });
  538. }),
  539. /**
  540. * Updates a station's description
  541. *
  542. * @param session
  543. * @param stationId - the station id
  544. * @param newDescription - the new station description
  545. * @param cb
  546. */
  547. updateDescription: hooks.ownerRequired((session, stationId, newDescription, cb) => {
  548. async.waterfall([
  549. (next) => {
  550. db.models.station.updateOne({_id: stationId}, {$set: {description: newDescription}}, {runValidators: true}, next);
  551. },
  552. (res, next) => {
  553. stations.updateStation(stationId, next);
  554. }
  555. ], async (err) => {
  556. if (err) {
  557. err = await utils.getError(err);
  558. logger.error("STATIONS_UPDATE_DESCRIPTION", `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`);
  559. return cb({'status': 'failure', 'message': err});
  560. }
  561. logger.success("STATIONS_UPDATE_DESCRIPTION", `Updated station "${stationId}" description to "${newDescription}" successfully.`);
  562. return cb({'status': 'success', 'message': 'Successfully updated the description.'});
  563. });
  564. }),
  565. /**
  566. * Updates a station's privacy
  567. *
  568. * @param session
  569. * @param stationId - the station id
  570. * @param newPrivacy - the new station privacy
  571. * @param cb
  572. */
  573. updatePrivacy: hooks.ownerRequired((session, stationId, newPrivacy, cb) => {
  574. async.waterfall([
  575. (next) => {
  576. db.models.station.updateOne({_id: stationId}, {$set: {privacy: newPrivacy}}, {runValidators: true}, next);
  577. },
  578. (res, next) => {
  579. stations.updateStation(stationId, next);
  580. }
  581. ], async (err) => {
  582. if (err) {
  583. err = await utils.getError(err);
  584. logger.error("STATIONS_UPDATE_PRIVACY", `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`);
  585. return cb({'status': 'failure', 'message': err});
  586. }
  587. logger.success("STATIONS_UPDATE_PRIVACY", `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`);
  588. return cb({'status': 'success', 'message': 'Successfully updated the privacy.'});
  589. });
  590. }),
  591. /**
  592. * Updates a station's genres
  593. *
  594. * @param session
  595. * @param stationId - the station id
  596. * @param newGenres - the new station genres
  597. * @param cb
  598. */
  599. updateGenres: hooks.ownerRequired((session, stationId, newGenres, cb) => {
  600. async.waterfall([
  601. (next) => {
  602. db.models.station.updateOne({_id: stationId}, {$set: {genres: newGenres}}, {runValidators: true}, next);
  603. },
  604. (res, next) => {
  605. stations.updateStation(stationId, next);
  606. }
  607. ], async (err) => {
  608. if (err) {
  609. err = await utils.getError(err);
  610. logger.error("STATIONS_UPDATE_GENRES", `Updating station "${stationId}" genres to "${newGenres}" failed. "${err}"`);
  611. return cb({'status': 'failure', 'message': err});
  612. }
  613. logger.success("STATIONS_UPDATE_GENRES", `Updated station "${stationId}" genres to "${newGenres}" successfully.`);
  614. return cb({'status': 'success', 'message': 'Successfully updated the genres.'});
  615. });
  616. }),
  617. /**
  618. * Updates a station's blacklisted genres
  619. *
  620. * @param session
  621. * @param stationId - the station id
  622. * @param newBlacklistedGenres - the new station blacklisted genres
  623. * @param cb
  624. */
  625. updateBlacklistedGenres: hooks.ownerRequired((session, stationId, newBlacklistedGenres, cb) => {
  626. async.waterfall([
  627. (next) => {
  628. db.models.station.updateOne({_id: stationId}, {$set: {blacklistedGenres: newBlacklistedGenres}}, {runValidators: true}, next);
  629. },
  630. (res, next) => {
  631. stations.updateStation(stationId, next);
  632. }
  633. ], async (err) => {
  634. if (err) {
  635. err = await utils.getError(err);
  636. logger.error("STATIONS_UPDATE_BLACKLISTED_GENRES", `Updating station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" failed. "${err}"`);
  637. return cb({'status': 'failure', 'message': err});
  638. }
  639. logger.success("STATIONS_UPDATE_BLACKLISTED_GENRES", `Updated station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" successfully.`);
  640. return cb({'status': 'success', 'message': 'Successfully updated the blacklisted genres.'});
  641. });
  642. }),
  643. /**
  644. * Updates a station's party mode
  645. *
  646. * @param session
  647. * @param stationId - the station id
  648. * @param newPartyMode - the new station party mode
  649. * @param cb
  650. */
  651. updatePartyMode: hooks.ownerRequired((session, stationId, newPartyMode, cb) => {
  652. async.waterfall([
  653. (next) => {
  654. stations.getStation(stationId, next);
  655. },
  656. (station, next) => {
  657. if (!station) return next('Station not found.');
  658. if (station.partyMode === newPartyMode) return next('The party mode was already ' + ((newPartyMode) ? 'enabled.' : 'disabled.'));
  659. db.models.station.updateOne({_id: stationId}, {$set: {partyMode: newPartyMode}}, {runValidators: true}, next);
  660. },
  661. (res, next) => {
  662. stations.updateStation(stationId, next);
  663. }
  664. ], async (err) => {
  665. if (err) {
  666. err = await utils.getError(err);
  667. logger.error("STATIONS_UPDATE_PARTY_MODE", `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`);
  668. return cb({'status': 'failure', 'message': err});
  669. }
  670. logger.success("STATIONS_UPDATE_PARTY_MODE", `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`);
  671. cache.pub('station.updatePartyMode', {stationId: stationId, partyMode: newPartyMode});
  672. stations.skipStation(stationId)();
  673. return cb({'status': 'success', 'message': 'Successfully updated the party mode.'});
  674. });
  675. }),
  676. /**
  677. * Pauses a station
  678. *
  679. * @param session
  680. * @param stationId - the station id
  681. * @param cb
  682. */
  683. pause: hooks.ownerRequired((session, stationId, cb) => {
  684. async.waterfall([
  685. (next) => {
  686. stations.getStation(stationId, next);
  687. },
  688. (station, next) => {
  689. if (!station) return next('Station not found.');
  690. if (station.paused) return next('That station was already paused.');
  691. db.models.station.updateOne({_id: stationId}, {$set: {paused: true, pausedAt: Date.now()}}, next);
  692. },
  693. (res, next) => {
  694. stations.updateStation(stationId, next);
  695. }
  696. ], async (err) => {
  697. if (err) {
  698. err = await utils.getError(err);
  699. logger.error("STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  700. return cb({'status': 'failure', 'message': err});
  701. }
  702. logger.success("STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  703. cache.pub('station.pause', stationId);
  704. notifications.unschedule(`stations.nextSong?id=${stationId}`);
  705. return cb({'status': 'success', 'message': 'Successfully paused.'});
  706. });
  707. }),
  708. /**
  709. * Resumes a station
  710. *
  711. * @param session
  712. * @param stationId - the station id
  713. * @param cb
  714. */
  715. resume: hooks.ownerRequired((session, stationId, cb) => {
  716. async.waterfall([
  717. (next) => {
  718. stations.getStation(stationId, next);
  719. },
  720. (station, next) => {
  721. if (!station) return next('Station not found.');
  722. if (!station.paused) return next('That station is not paused.');
  723. station.timePaused += (Date.now() - station.pausedAt);
  724. db.models.station.updateOne({_id: stationId}, {$set: {paused: false}, $inc: {timePaused: Date.now() - station.pausedAt}}, next);
  725. },
  726. (res, next) => {
  727. stations.updateStation(stationId, next);
  728. }
  729. ], async (err) => {
  730. if (err) {
  731. err = await utils.getError(err);
  732. logger.error("STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  733. return cb({'status': 'failure', 'message': err});
  734. }
  735. logger.success("STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  736. cache.pub('station.resume', stationId);
  737. return cb({'status': 'success', 'message': 'Successfully resumed.'});
  738. });
  739. }),
  740. /**
  741. * Removes a station
  742. *
  743. * @param session
  744. * @param stationId - the station id
  745. * @param cb
  746. */
  747. remove: hooks.ownerRequired((session, stationId, cb) => {
  748. async.waterfall([
  749. (next) => {
  750. db.models.station.deleteOne({ _id: stationId }, err => next(err));
  751. },
  752. (next) => {
  753. cache.hdel('stations', stationId, err => next(err));
  754. }
  755. ], async (err) => {
  756. if (err) {
  757. err = await utils.getError(err);
  758. logger.error("STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  759. return cb({ 'status': 'failure', 'message': err });
  760. }
  761. logger.success("STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  762. cache.pub('station.remove', stationId);
  763. return cb({ 'status': 'success', 'message': 'Successfully removed.' });
  764. });
  765. }),
  766. /**
  767. * Create a station
  768. *
  769. * @param session
  770. * @param data - the station data
  771. * @param cb
  772. */
  773. create: hooks.loginRequired((session, data, cb) => {
  774. data.name = data.name.toLowerCase();
  775. let blacklist = ["country", "edm", "musare", "hip-hop", "rap", "top-hits", "todays-hits", "old-school", "christmas", "about", "support", "staff", "help", "news", "terms", "privacy", "profile", "c", "community", "tos", "login", "register", "p", "official", "o", "trap", "faq", "team", "donate", "buy", "shop", "forums", "explore", "settings", "admin", "auth", "reset_password"];
  776. async.waterfall([
  777. (next) => {
  778. if (!data) return next('Invalid data.');
  779. next();
  780. },
  781. (next) => {
  782. db.models.station.findOne({ $or: [{name: data.name}, {displayName: new RegExp(`^${data.displayName}$`, 'i')}] }, next);
  783. },
  784. (station, next) => {
  785. if (station) return next('A station with that name or display name already exists.');
  786. const { name, displayName, description, genres, playlist, type, blacklistedGenres } = data;
  787. if (type === 'official') {
  788. db.models.user.findOne({_id: session.userId}, (err, user) => {
  789. if (err) return next(err);
  790. if (!user) return next('User not found.');
  791. if (user.role !== 'admin') return next('Admin required.');
  792. db.models.station.create({
  793. name,
  794. displayName,
  795. description,
  796. type,
  797. privacy: 'private',
  798. playlist,
  799. genres,
  800. blacklistedGenres,
  801. currentSong: stations.defaultSong
  802. }, next);
  803. });
  804. } else if (type === 'community') {
  805. if (blacklist.indexOf(name) !== -1) return next('That name is blacklisted. Please use a different name.');
  806. db.models.station.create({
  807. name,
  808. displayName,
  809. description,
  810. type,
  811. privacy: 'private',
  812. owner: session.userId,
  813. queue: [],
  814. currentSong: null
  815. }, next);
  816. }
  817. }
  818. ], async (err, station) => {
  819. if (err) {
  820. err = await utils.getError(err);
  821. logger.error("STATIONS_CREATE", `Creating station failed. "${err}"`);
  822. return cb({'status': 'failure', 'message': err});
  823. }
  824. logger.success("STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  825. cache.pub('station.create', station._id);
  826. return cb({'status': 'success', 'message': 'Successfully created station.'});
  827. });
  828. }),
  829. /**
  830. * Adds song to station queue
  831. *
  832. * @param session
  833. * @param stationId - the station id
  834. * @param songId - the song id
  835. * @param cb
  836. */
  837. addToQueue: hooks.loginRequired((session, stationId, songId, cb) => {
  838. async.waterfall([
  839. (next) => {
  840. stations.getStation(stationId, next);
  841. },
  842. (station, next) => {
  843. if (!station) return next('Station not found.');
  844. if (station.locked) {
  845. db.models.user.findOne({ _id: session.userId }, (err, user) => {
  846. if (user.role !== 'admin' && station.owner !== session.userId) return next('Only owners and admins can add songs to a locked queue.');
  847. else return next(null, station);
  848. });
  849. } else {
  850. return next(null, station);
  851. }
  852. },
  853. (station, next) => {
  854. if (station.type !== 'community') return next('That station is not a community station.');
  855. stations.canUserViewStation(station, session.userId, (err, canView) => {
  856. if (err) return next(err);
  857. if (canView) return next(null, station);
  858. return next('Insufficient permissions.');
  859. });
  860. },
  861. (station, next) => {
  862. if (station.currentSong && station.currentSong.songId === songId) return next('That song is currently playing.');
  863. async.each(station.queue, (queueSong, next) => {
  864. if (queueSong.songId === songId) return next('That song is already in the queue.');
  865. next();
  866. }, (err) => {
  867. next(err, station);
  868. });
  869. },
  870. (station, next) => {
  871. songs.getSong(songId, (err, song) => {
  872. if (!err && song) return next(null, song, station);
  873. utils.getSongFromYouTube(songId, (song) => {
  874. song.artists = [];
  875. song.skipDuration = 0;
  876. song.likes = -1;
  877. song.dislikes = -1;
  878. song.thumbnail = "empty";
  879. song.explicit = false;
  880. next(null, song, station);
  881. });
  882. });
  883. },
  884. (song, station, next) => {
  885. let queue = station.queue;
  886. song.requestedBy = session.userId;
  887. queue.push(song);
  888. let totalDuration = 0;
  889. queue.forEach((song) => {
  890. totalDuration += song.duration;
  891. });
  892. if (totalDuration >= 3600 * 3) return next('The max length of the queue is 3 hours.');
  893. next(null, song, station);
  894. },
  895. (song, station, next) => {
  896. let queue = station.queue;
  897. if (queue.length === 0) return next(null, song, station);
  898. let totalDuration = 0;
  899. const userId = queue[queue.length - 1].requestedBy;
  900. station.queue.forEach((song) => {
  901. if (userId === song.requestedBy) {
  902. totalDuration += song.duration;
  903. }
  904. });
  905. if(totalDuration >= 900) return next('The max length of songs per user is 15 minutes.');
  906. next(null, song, station);
  907. },
  908. (song, station, next) => {
  909. let queue = station.queue;
  910. if (queue.length === 0) return next(null, song);
  911. let totalSongs = 0;
  912. const userId = queue[queue.length - 1].requestedBy;
  913. queue.forEach((song) => {
  914. if (userId === song.requestedBy) {
  915. totalSongs++;
  916. }
  917. });
  918. if (totalSongs <= 2) return next(null, song);
  919. if (totalSongs > 3) return next('The max amount of songs per user is 3, and only 2 in a row is allowed.');
  920. if (queue[queue.length - 2].requestedBy !== userId || queue[queue.length - 3] !== userId) return next('The max amount of songs per user is 3, and only 2 in a row is allowed.');
  921. next(null, song);
  922. },
  923. (song, next) => {
  924. db.models.station.updateOne({_id: stationId}, {$push: {queue: song}}, {runValidators: true}, next);
  925. },
  926. (res, next) => {
  927. stations.updateStation(stationId, next);
  928. }
  929. ], async (err, station) => {
  930. if (err) {
  931. err = await utils.getError(err);
  932. logger.error("STATIONS_ADD_SONG_TO_QUEUE", `Adding song "${songId}" to station "${stationId}" queue failed. "${err}"`);
  933. return cb({'status': 'failure', 'message': err});
  934. }
  935. logger.success("STATIONS_ADD_SONG_TO_QUEUE", `Added song "${songId}" to station "${stationId}" successfully.`);
  936. cache.pub('station.queueUpdate', stationId);
  937. return cb({'status': 'success', 'message': 'Successfully added song to queue.'});
  938. });
  939. }),
  940. /**
  941. * Removes song from station queue
  942. *
  943. * @param session
  944. * @param stationId - the station id
  945. * @param songId - the song id
  946. * @param cb
  947. */
  948. removeFromQueue: hooks.ownerRequired((session, stationId, songId, cb) => {
  949. async.waterfall([
  950. (next) => {
  951. if (!songId) return next('Invalid song id.');
  952. stations.getStation(stationId, next);
  953. },
  954. (station, next) => {
  955. if (!station) return next('Station not found.');
  956. if (station.type !== 'community') return next('Station is not a community station.');
  957. async.each(station.queue, (queueSong, next) => {
  958. if (queueSong.songId === songId) return next(true);
  959. next();
  960. }, (err) => {
  961. if (err === true) return next();
  962. next('Song is not currently in the queue.');
  963. });
  964. },
  965. (next) => {
  966. db.models.station.updateOne({_id: stationId}, {$pull: {queue: {songId: songId}}}, next);
  967. },
  968. (res, next) => {
  969. stations.updateStation(stationId, next);
  970. }
  971. ], async (err, station) => {
  972. if (err) {
  973. err = await utils.getError(err);
  974. logger.error("STATIONS_REMOVE_SONG_TO_QUEUE", `Removing song "${songId}" from station "${stationId}" queue failed. "${err}"`);
  975. return cb({'status': 'failure', 'message': err});
  976. }
  977. logger.success("STATIONS_REMOVE_SONG_TO_QUEUE", `Removed song "${songId}" from station "${stationId}" successfully.`);
  978. cache.pub('station.queueUpdate', stationId);
  979. return cb({'status': 'success', 'message': 'Successfully removed song from queue.'});
  980. });
  981. }),
  982. /**
  983. * Gets the queue from a station
  984. *
  985. * @param session
  986. * @param stationId - the station id
  987. * @param cb
  988. */
  989. getQueue: (session, stationId, cb) => {
  990. async.waterfall([
  991. (next) => {
  992. stations.getStation(stationId, next);
  993. },
  994. (station, next) => {
  995. if (!station) return next('Station not found.');
  996. if (station.type !== 'community') return next('Station is not a community station.');
  997. next(null, station);
  998. },
  999. (station, next) => {
  1000. stations.canUserViewStation(station, session.userId, (err, canView) => {
  1001. if (err) return next(err);
  1002. if (canView) return next(null, station);
  1003. return next('Insufficient permissions.');
  1004. });
  1005. }
  1006. ], async (err, station) => {
  1007. if (err) {
  1008. err = await utils.getError(err);
  1009. logger.error("STATIONS_GET_QUEUE", `Getting queue for station "${stationId}" failed. "${err}"`);
  1010. return cb({'status': 'failure', 'message': err});
  1011. }
  1012. logger.success("STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  1013. return cb({'status': 'success', 'message': 'Successfully got queue.', queue: station.queue});
  1014. });
  1015. },
  1016. /**
  1017. * Selects a private playlist for a station
  1018. *
  1019. * @param session
  1020. * @param stationId - the station id
  1021. * @param playlistId - the private playlist id
  1022. * @param cb
  1023. */
  1024. selectPrivatePlaylist: hooks.ownerRequired((session, stationId, playlistId, cb) => {
  1025. async.waterfall([
  1026. (next) => {
  1027. stations.getStation(stationId, next);
  1028. },
  1029. (station, next) => {
  1030. if (!station) return next('Station not found.');
  1031. if (station.type !== 'community') return next('Station is not a community station.');
  1032. if (station.privatePlaylist === playlistId) return next('That private playlist is already selected.');
  1033. db.models.playlist.findOne({_id: playlistId}, next);
  1034. },
  1035. (playlist, next) => {
  1036. if (!playlist) return next('Playlist not found.');
  1037. let currentSongIndex = (playlist.songs.length > 0) ? playlist.songs.length - 1 : 0;
  1038. db.models.station.updateOne({_id: stationId}, {$set: {privatePlaylist: playlistId, currentSongIndex: currentSongIndex}}, {runValidators: true}, next);
  1039. },
  1040. (res, next) => {
  1041. stations.updateStation(stationId, next);
  1042. }
  1043. ], async (err, station) => {
  1044. if (err) {
  1045. err = await utils.getError(err);
  1046. logger.error("STATIONS_SELECT_PRIVATE_PLAYLIST", `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`);
  1047. return cb({'status': 'failure', 'message': err});
  1048. }
  1049. logger.success("STATIONS_SELECT_PRIVATE_PLAYLIST", `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`);
  1050. notifications.unschedule(`stations.nextSong?id${stationId}`);
  1051. if (!station.partyMode) stations.skipStation(stationId)();
  1052. cache.pub('privatePlaylist.selected', {playlistId, stationId});
  1053. return cb({'status': 'success', 'message': 'Successfully selected playlist.'});
  1054. });
  1055. }),
  1056. favoriteStation: hooks.loginRequired((session, stationId, cb) => {
  1057. async.waterfall([
  1058. (next) => {
  1059. stations.getStation(stationId, next);
  1060. },
  1061. (station, next) => {
  1062. if (!station) return next('Station not found.');
  1063. stations.canUserViewStation(station, session.userId, (err, canView) => {
  1064. if (err) return next(err);
  1065. if (canView) return next();
  1066. return next('Insufficient permissions.');
  1067. });
  1068. },
  1069. (next) => {
  1070. db.models.user.updateOne({ _id: session.userId }, { $addToSet: { favoriteStations: stationId } }, next);
  1071. },
  1072. (res, next) => {
  1073. if (res.nModified === 0) return next("The station was already favorited.");
  1074. next();
  1075. }
  1076. ], async (err) => {
  1077. if (err) {
  1078. err = await utils.getError(err);
  1079. logger.error("FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  1080. return cb({'status': 'failure', 'message': err});
  1081. }
  1082. logger.success("FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  1083. cache.pub('user.favoritedStation', { userId: session.userId, stationId });
  1084. return cb({'status': 'success', 'message': 'Succesfully favorited station.'});
  1085. });
  1086. }),
  1087. unfavoriteStation: hooks.loginRequired((session, stationId, cb) => {
  1088. async.waterfall([
  1089. (next) => {
  1090. db.models.user.updateOne({ _id: session.userId }, { $pull: { favoriteStations: stationId } }, next);
  1091. },
  1092. (res, next) => {
  1093. if (res.nModified === 0) return next("The station wasn't favorited.");
  1094. next();
  1095. }
  1096. ], async (err) => {
  1097. if (err) {
  1098. err = await utils.getError(err);
  1099. logger.error("UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  1100. return cb({'status': 'failure', 'message': err});
  1101. }
  1102. logger.success("UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  1103. cache.pub('user.unfavoritedStation', { userId: session.userId, stationId });
  1104. return cb({'status': 'success', 'message': 'Succesfully unfavorited station.'});
  1105. });
  1106. }),
  1107. };