stations.js 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257
  1. 'use strict';
  2. const async = require('async'),
  3. request = require('request'),
  4. config = require('config'),
  5. _ = require('underscore')._;
  6. const hooks = require('./hooks');
  7. const moduleManager = require("../../index");
  8. const db = moduleManager.modules["db"];
  9. const cache = moduleManager.modules["cache"];
  10. const notifications = moduleManager.modules["notifications"];
  11. const utils = moduleManager.modules["utils"];
  12. const logger = moduleManager.modules["logger"];
  13. const stations = moduleManager.modules["stations"];
  14. const songs = moduleManager.modules["songs"];
  15. let userList = {};
  16. let usersPerStation = {};
  17. let usersPerStationCount = {};
  18. setInterval(() => {
  19. let stationsCountUpdated = [];
  20. let stationsUpdated = [];
  21. let oldUsersPerStation = usersPerStation;
  22. usersPerStation = {};
  23. let oldUsersPerStationCount = usersPerStationCount;
  24. usersPerStationCount = {};
  25. async.each(Object.keys(userList), function(socketId, next) {
  26. utils.socketFromSession(socketId).then((socket) => {
  27. let stationId = userList[socketId];
  28. if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
  29. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  30. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  31. delete userList[socketId];
  32. return next();
  33. }
  34. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
  35. usersPerStationCount[stationId]++;
  36. if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
  37. async.waterfall([
  38. (next) => {
  39. if (!socket.session || !socket.session.sessionId) return next('No session found.');
  40. cache.hget('sessions', socket.session.sessionId, next);
  41. },
  42. (session, next) => {
  43. if (!session) return next('Session not found.');
  44. db.models.user.findOne({_id: session.userId}, next);
  45. },
  46. (user, next) => {
  47. if (!user) return next('User not found.');
  48. if (usersPerStation[stationId].indexOf(user.username) !== -1) return next('User already in the list.');
  49. next(null, user.username);
  50. }
  51. ], (err, username) => {
  52. if (!err) {
  53. usersPerStation[stationId].push(username);
  54. }
  55. next();
  56. });
  57. });
  58. //TODO Code to show users
  59. }, (err) => {
  60. for (let stationId in usersPerStationCount) {
  61. if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
  62. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  63. }
  64. }
  65. for (let stationId in usersPerStation) {
  66. if (_.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 || _.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0) {
  67. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  68. }
  69. }
  70. stationsCountUpdated.forEach((stationId) => {
  71. //logger.info("UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  72. cache.pub('station.updateUserCount', stationId);
  73. });
  74. stationsUpdated.forEach((stationId) => {
  75. //logger.info("UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  76. cache.pub('station.updateUsers', stationId);
  77. });
  78. //console.log("Userlist", usersPerStation);
  79. });
  80. }, 3000);
  81. cache.sub('station.updateUsers', stationId => {
  82. let list = usersPerStation[stationId] || [];
  83. utils.emitToRoom(`station.${stationId}`, "event:users.updated", list);
  84. });
  85. cache.sub('station.updateUserCount', stationId => {
  86. let count = usersPerStationCount[stationId] || 0;
  87. utils.emitToRoom(`station.${stationId}`, "event:userCount.updated", count);
  88. stations.getStation(stationId, async (err, station) => {
  89. if (station.privacy === 'public') utils.emitToRoom('home', "event:userCount.updated", stationId, count);
  90. else {
  91. let sockets = await utils.getRoomSockets('home');
  92. for (let socketId in sockets) {
  93. let socket = sockets[socketId];
  94. let session = sockets[socketId].session;
  95. if (session.sessionId) {
  96. cache.hget('sessions', session.sessionId, (err, session) => {
  97. if (!err && session) {
  98. db.models.user.findOne({_id: session.userId}, (err, user) => {
  99. if (user.role === 'admin') socket.emit("event:userCount.updated", stationId, count);
  100. else if (station.type === "community" && station.owner === session.userId) socket.emit("event:userCount.updated", stationId, count);
  101. });
  102. }
  103. });
  104. }
  105. }
  106. }
  107. })
  108. });
  109. cache.sub('station.queueLockToggled', data => {
  110. utils.emitToRoom(`station.${data.stationId}`, "event:queueLockToggled", data.locked)
  111. });
  112. cache.sub('station.updatePartyMode', data => {
  113. utils.emitToRoom(`station.${data.stationId}`, "event:partyMode.updated", data.partyMode);
  114. });
  115. cache.sub('privatePlaylist.selected', data => {
  116. utils.emitToRoom(`station.${data.stationId}`, "event:privatePlaylist.selected", data.playlistId);
  117. });
  118. cache.sub('station.pause', stationId => {
  119. stations.getStation(stationId, (err, station) => {
  120. utils.emitToRoom(`station.${stationId}`, "event:stations.pause", { pausedAt: station.pausedAt });
  121. });
  122. });
  123. cache.sub('station.resume', stationId => {
  124. stations.getStation(stationId, (err, station) => {
  125. utils.emitToRoom(`station.${stationId}`, "event:stations.resume", { timePaused: station.timePaused });
  126. });
  127. });
  128. cache.sub('station.queueUpdate', stationId => {
  129. stations.getStation(stationId, (err, station) => {
  130. if (!err) utils.emitToRoom(`station.${stationId}`, "event:queue.update", station.queue);
  131. });
  132. });
  133. cache.sub('station.voteSkipSong', stationId => {
  134. utils.emitToRoom(`station.${stationId}`, "event:song.voteSkipSong");
  135. });
  136. cache.sub('station.remove', stationId => {
  137. utils.emitToRoom(`station.${stationId}`, 'event:stations.remove');
  138. utils.emitToRoom('admin.stations', 'event:admin.station.removed', stationId);
  139. });
  140. cache.sub('station.create', stationId => {
  141. stations.initializeStation(stationId, async (err, station) => {
  142. station.userCount = usersPerStationCount[stationId] || 0;
  143. if (err) console.error(err);
  144. utils.emitToRoom('admin.stations', 'event:admin.station.added', station);
  145. // TODO If community, check if on whitelist
  146. if (station.privacy === 'public') utils.emitToRoom('home', "event:stations.created", station);
  147. else {
  148. let sockets = await utils.getRoomSockets('home');
  149. for (let socketId in sockets) {
  150. let socket = sockets[socketId];
  151. let session = sockets[socketId].session;
  152. if (session.sessionId) {
  153. cache.hget('sessions', session.sessionId, (err, session) => {
  154. if (!err && session) {
  155. db.models.user.findOne({_id: session.userId}, (err, user) => {
  156. if (user.role === 'admin') socket.emit("event:stations.created", station);
  157. else if (station.type === "community" && station.owner === session.userId) socket.emit("event:stations.created", station);
  158. });
  159. }
  160. });
  161. }
  162. }
  163. }
  164. });
  165. });
  166. module.exports = {
  167. /**
  168. * Get a list of all the stations
  169. *
  170. * @param session
  171. * @param cb
  172. * @return {{ status: String, stations: Array }}
  173. */
  174. index: (session, cb) => {
  175. async.waterfall([
  176. (next) => {
  177. cache.hgetall('stations', next);
  178. },
  179. (stations, next) => {
  180. let resultStations = [];
  181. for (let id in stations) {
  182. resultStations.push(stations[id]);
  183. }
  184. next(null, stations);
  185. },
  186. (stations, next) => {
  187. let resultStations = [];
  188. async.each(stations, (station, next) => {
  189. async.waterfall([
  190. (next) => {
  191. if (station.privacy === 'public') return next(true);
  192. if (!session.sessionId) return next(`Insufficient permissions.`);
  193. cache.hget('sessions', session.sessionId, next);
  194. },
  195. (session, next) => {
  196. if (!session) return next(`Insufficient permissions.`);
  197. db.models.user.findOne({_id: session.userId}, next);
  198. },
  199. (user, next) => {
  200. if (!user) return next(`Insufficient permissions.`);
  201. if (user.role === 'admin') return next(true);
  202. if (station.type === 'official') return next(`Insufficient permissions.`);
  203. if (station.owner === session.userId) return next(true);
  204. next(`Insufficient permissions.`);
  205. }
  206. ], (err) => {
  207. station.userCount = usersPerStationCount[station._id] || 0;
  208. if (err === true) resultStations.push(station);
  209. next();
  210. });
  211. }, () => {
  212. next(null, resultStations);
  213. });
  214. }
  215. ], async (err, stations) => {
  216. if (err) {
  217. err = await utils.getError(err);
  218. logger.error("STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  219. return cb({'status': 'failure', 'message': err});
  220. }
  221. logger.success("STATIONS_INDEX", `Indexing stations successful.`, false);
  222. return cb({'status': 'success', 'stations': stations});
  223. });
  224. },
  225. /**
  226. * Finds a station by name
  227. *
  228. * @param session
  229. * @param stationName - the station name
  230. * @param cb
  231. */
  232. findByName: (session, stationName, cb) => {
  233. async.waterfall([
  234. (next) => {
  235. stations.getStationByName(stationName, next);
  236. },
  237. (station, next) => {
  238. if (!station) return next('Station not found.');
  239. next(null, station);
  240. }
  241. ], async (err, station) => {
  242. if (err) {
  243. err = await utils.getError(err);
  244. logger.error("STATIONS_FIND_BY_NAME", `Finding station "${stationName}" failed. "${err}"`);
  245. return cb({'status': 'failure', 'message': err});
  246. }
  247. logger.success("STATIONS_FIND_BY_NAME", `Found station "${stationName}" successfully.`, false);
  248. cb({status: 'success', data: station});
  249. });
  250. },
  251. /**
  252. * Gets the official playlist for a station
  253. *
  254. * @param session
  255. * @param stationId - the station id
  256. * @param cb
  257. */
  258. getPlaylist: (session, stationId, cb) => {
  259. async.waterfall([
  260. (next) => {
  261. stations.getStation(stationId, next);
  262. },
  263. (station, next) => {
  264. if (!station) return next('Station not found.');
  265. else if (station.type !== 'official') return next('This is not an official station.');
  266. else next();
  267. },
  268. (next) => {
  269. cache.hget('officialPlaylists', stationId, next);
  270. },
  271. (playlist, next) => {
  272. if (!playlist) return next('Playlist not found.');
  273. next(null, playlist);
  274. }
  275. ], async (err, playlist) => {
  276. if (err) {
  277. err = await utils.getError(err);
  278. logger.error("STATIONS_GET_PLAYLIST", `Getting playlist for station "${stationId}" failed. "${err}"`);
  279. return cb({ status: 'failure', message: err });
  280. } else {
  281. logger.success("STATIONS_GET_PLAYLIST", `Got playlist for station "${stationId}" successfully.`, false);
  282. cb({ status: 'success', data: playlist.songs });
  283. }
  284. });
  285. },
  286. /**
  287. * Joins the station by its name
  288. *
  289. * @param session
  290. * @param stationName - the station name
  291. * @param cb
  292. * @return {{ status: String, userCount: Integer }}
  293. */
  294. join: (session, stationName, cb) => {
  295. async.waterfall([
  296. (next) => {
  297. stations.getStationByName(stationName, next);
  298. },
  299. (station, next) => {
  300. if (!station) return next('Station not found.');
  301. async.waterfall([
  302. (next) => {
  303. if (station.privacy !== 'private') return next(true);
  304. if (!session.userId) return next('An error occurred while joining the station.');
  305. next();
  306. },
  307. (next) => {
  308. db.models.user.findOne({_id: session.userId}, next);
  309. },
  310. (user, next) => {
  311. if (!user) return next('An error occurred while joining the station.');
  312. if (user.role === 'admin') return next(true);
  313. if (station.type === 'official') return next('An error occurred while joining the station.');
  314. if (station.owner === session.userId) return next(true);
  315. next('An error occurred while joining the station.');
  316. }
  317. ], async (err) => {
  318. if (err === true) return next(null, station);
  319. next(await utils.getError(err));
  320. });
  321. },
  322. (station, next) => {
  323. utils.socketJoinRoom(session.socketId, `station.${station._id}`);
  324. let data = {
  325. _id: station._id,
  326. type: station.type,
  327. currentSong: station.currentSong,
  328. startedAt: station.startedAt,
  329. paused: station.paused,
  330. timePaused: station.timePaused,
  331. pausedAt: station.pausedAt,
  332. description: station.description,
  333. displayName: station.displayName,
  334. privacy: station.privacy,
  335. locked: station.locked,
  336. partyMode: station.partyMode,
  337. owner: station.owner,
  338. privatePlaylist: station.privatePlaylist
  339. };
  340. userList[session.socketId] = station._id;
  341. next(null, data);
  342. },
  343. (data, next) => {
  344. data.userCount = usersPerStationCount[data._id] || 0;
  345. data.users = usersPerStation[data._id] || [];
  346. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  347. utils.socketJoinSongRoom(session.socketId, `song.${data.currentSong.songId}`);
  348. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  349. songs.getSongFromId(data.currentSong.songId, (err, song) => {
  350. if (!err && song) {
  351. data.currentSong.likes = song.likes;
  352. data.currentSong.dislikes = song.dislikes;
  353. } else {
  354. data.currentSong.likes = -1;
  355. data.currentSong.dislikes = -1;
  356. }
  357. next(null, data);
  358. });
  359. }
  360. ], async (err, data) => {
  361. if (err) {
  362. err = await utils.getError(err);
  363. logger.error("STATIONS_JOIN", `Joining station "${stationName}" failed. "${err}"`);
  364. return cb({'status': 'failure', 'message': err});
  365. }
  366. logger.success("STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  367. cb({status: 'success', data});
  368. });
  369. },
  370. /**
  371. * Toggles if a station is locked
  372. *
  373. * @param session
  374. * @param stationId - the station id
  375. * @param cb
  376. */
  377. toggleLock: hooks.ownerRequired((session, stationId, cb) => {
  378. async.waterfall([
  379. (next) => {
  380. stations.getStation(stationId, next);
  381. },
  382. (station, next) => {
  383. db.models.station.updateOne({ _id: stationId }, { $set: { locked: !station.locked} }, next);
  384. },
  385. (res, next) => {
  386. stations.updateStation(stationId, next);
  387. }
  388. ], async (err, station) => {
  389. if (err) {
  390. err = await utils.getError(err);
  391. logger.error("STATIONS_UPDATE_LOCKED_STATUS", `Toggling the queue lock for station "${stationId}" failed. "${err}"`);
  392. return cb({ status: 'failure', message: err });
  393. } else {
  394. logger.success("STATIONS_UPDATE_LOCKED_STATUS", `Toggled the queue lock for station "${stationId}" successfully to "${station.locked}".`);
  395. cache.pub('station.queueLockToggled', {stationId, locked: station.locked});
  396. return cb({ status: 'success', data: station.locked });
  397. }
  398. });
  399. }),
  400. /**
  401. * Votes to skip a station
  402. *
  403. * @param session
  404. * @param stationId - the station id
  405. * @param cb
  406. * @param userId
  407. */
  408. voteSkip: hooks.loginRequired((session, stationId, cb, userId) => {
  409. async.waterfall([
  410. (next) => {
  411. stations.getStation(stationId, next);
  412. },
  413. (station, next) => {
  414. if (!station) return next('Station not found.');
  415. utils.canUserBeInStation(station, userId, (canBe) => {
  416. if (canBe) return next(null, station);
  417. return next('Insufficient permissions.');
  418. });
  419. },
  420. (station, next) => {
  421. if (!station.currentSong) return next('There is currently no song to skip.');
  422. if (station.currentSong.skipVotes.indexOf(userId) !== -1) return next('You have already voted to skip this song.');
  423. next(null, station);
  424. },
  425. (station, next) => {
  426. db.models.station.updateOne({_id: stationId}, {$push: {"currentSong.skipVotes": userId}}, next)
  427. },
  428. (res, next) => {
  429. stations.updateStation(stationId, next);
  430. },
  431. (station, next) => {
  432. if (!station) return next('Station not found.');
  433. next(null, station);
  434. }
  435. ], async (err, station) => {
  436. if (err) {
  437. err = await utils.getError(err);
  438. logger.error("STATIONS_VOTE_SKIP", `Vote skipping station "${stationId}" failed. "${err}"`);
  439. return cb({'status': 'failure', 'message': err});
  440. }
  441. logger.success("STATIONS_VOTE_SKIP", `Vote skipping "${stationId}" successful.`);
  442. cache.pub('station.voteSkipSong', stationId);
  443. if (station.currentSong && station.currentSong.skipVotes.length >= 3) stations.skipStation(stationId)();
  444. cb({ status: 'success', message: 'Successfully voted to skip the song.' });
  445. });
  446. }),
  447. /**
  448. * Force skips a station
  449. *
  450. * @param session
  451. * @param stationId - the station id
  452. * @param cb
  453. */
  454. forceSkip: hooks.ownerRequired((session, stationId, cb) => {
  455. async.waterfall([
  456. (next) => {
  457. stations.getStation(stationId, next);
  458. },
  459. (station, next) => {
  460. if (!station) return next('Station not found.');
  461. next();
  462. }
  463. ], async (err) => {
  464. if (err) {
  465. err = await utils.getError(err);
  466. logger.error("STATIONS_FORCE_SKIP", `Force skipping station "${stationId}" failed. "${err}"`);
  467. return cb({'status': 'failure', 'message': err});
  468. }
  469. notifications.unschedule(`stations.nextSong?id=${stationId}`);
  470. stations.skipStation(stationId)();
  471. logger.success("STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  472. return cb({'status': 'success', 'message': 'Successfully skipped station.'});
  473. });
  474. }),
  475. /**
  476. * Leaves the user's current station
  477. *
  478. * @param session
  479. * @param stationId
  480. * @param cb
  481. * @return {{ status: String, userCount: Integer }}
  482. */
  483. leave: (session, stationId, cb) => {
  484. async.waterfall([
  485. (next) => {
  486. stations.getStation(stationId, next);
  487. },
  488. (station, next) => {
  489. if (!station) return next('Station not found.');
  490. next();
  491. }
  492. ], async (err, userCount) => {
  493. if (err) {
  494. err = await utils.getError(err);
  495. logger.error("STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  496. return cb({'status': 'failure', 'message': err});
  497. }
  498. logger.success("STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  499. utils.socketLeaveRooms(session);
  500. delete userList[session.socketId];
  501. return cb({'status': 'success', 'message': 'Successfully left station.', userCount});
  502. });
  503. },
  504. /**
  505. * Updates a station's name
  506. *
  507. * @param session
  508. * @param stationId - the station id
  509. * @param newName - the new station name
  510. * @param cb
  511. */
  512. updateName: hooks.ownerRequired((session, stationId, newName, cb) => {
  513. async.waterfall([
  514. (next) => {
  515. db.models.station.updateOne({_id: stationId}, {$set: {name: newName}}, {runValidators: true}, next);
  516. },
  517. (res, next) => {
  518. stations.updateStation(stationId, next);
  519. }
  520. ], async (err) => {
  521. if (err) {
  522. err = await utils.getError(err);
  523. logger.error("STATIONS_UPDATE_DISPLAY_NAME", `Updating station "${stationId}" displayName to "${newName}" failed. "${err}"`);
  524. return cb({'status': 'failure', 'message': err});
  525. }
  526. logger.success("STATIONS_UPDATE_DISPLAY_NAME", `Updated station "${stationId}" displayName to "${newName}" successfully.`);
  527. return cb({'status': 'success', 'message': 'Successfully updated the name.'});
  528. });
  529. }),
  530. /**
  531. * Updates a station's display name
  532. *
  533. * @param session
  534. * @param stationId - the station id
  535. * @param newDisplayName - the new station display name
  536. * @param cb
  537. */
  538. updateDisplayName: hooks.ownerRequired((session, stationId, newDisplayName, cb) => {
  539. async.waterfall([
  540. (next) => {
  541. db.models.station.updateOne({_id: stationId}, {$set: {displayName: newDisplayName}}, {runValidators: true}, next);
  542. },
  543. (res, next) => {
  544. stations.updateStation(stationId, next);
  545. }
  546. ], async (err) => {
  547. if (err) {
  548. err = await utils.getError(err);
  549. logger.error("STATIONS_UPDATE_DISPLAY_NAME", `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`);
  550. return cb({'status': 'failure', 'message': err});
  551. }
  552. logger.success("STATIONS_UPDATE_DISPLAY_NAME", `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`);
  553. return cb({'status': 'success', 'message': 'Successfully updated the display name.'});
  554. });
  555. }),
  556. /**
  557. * Updates a station's description
  558. *
  559. * @param session
  560. * @param stationId - the station id
  561. * @param newDescription - the new station description
  562. * @param cb
  563. */
  564. updateDescription: hooks.ownerRequired((session, stationId, newDescription, cb) => {
  565. async.waterfall([
  566. (next) => {
  567. db.models.station.updateOne({_id: stationId}, {$set: {description: newDescription}}, {runValidators: true}, next);
  568. },
  569. (res, next) => {
  570. stations.updateStation(stationId, next);
  571. }
  572. ], async (err) => {
  573. if (err) {
  574. err = await utils.getError(err);
  575. logger.error("STATIONS_UPDATE_DESCRIPTION", `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`);
  576. return cb({'status': 'failure', 'message': err});
  577. }
  578. logger.success("STATIONS_UPDATE_DESCRIPTION", `Updated station "${stationId}" description to "${newDescription}" successfully.`);
  579. return cb({'status': 'success', 'message': 'Successfully updated the description.'});
  580. });
  581. }),
  582. /**
  583. * Updates a station's privacy
  584. *
  585. * @param session
  586. * @param stationId - the station id
  587. * @param newPrivacy - the new station privacy
  588. * @param cb
  589. */
  590. updatePrivacy: hooks.ownerRequired((session, stationId, newPrivacy, cb) => {
  591. async.waterfall([
  592. (next) => {
  593. db.models.station.updateOne({_id: stationId}, {$set: {privacy: newPrivacy}}, {runValidators: true}, next);
  594. },
  595. (res, next) => {
  596. stations.updateStation(stationId, next);
  597. }
  598. ], async (err) => {
  599. if (err) {
  600. err = await utils.getError(err);
  601. logger.error("STATIONS_UPDATE_PRIVACY", `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`);
  602. return cb({'status': 'failure', 'message': err});
  603. }
  604. logger.success("STATIONS_UPDATE_PRIVACY", `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`);
  605. return cb({'status': 'success', 'message': 'Successfully updated the privacy.'});
  606. });
  607. }),
  608. /**
  609. * Updates a station's genres
  610. *
  611. * @param session
  612. * @param stationId - the station id
  613. * @param newGenres - the new station genres
  614. * @param cb
  615. */
  616. updateGenres: hooks.ownerRequired((session, stationId, newGenres, cb) => {
  617. async.waterfall([
  618. (next) => {
  619. db.models.station.updateOne({_id: stationId}, {$set: {genres: newGenres}}, {runValidators: true}, next);
  620. },
  621. (res, next) => {
  622. stations.updateStation(stationId, next);
  623. }
  624. ], async (err) => {
  625. if (err) {
  626. err = await utils.getError(err);
  627. logger.error("STATIONS_UPDATE_GENRES", `Updating station "${stationId}" genres to "${newGenres}" failed. "${err}"`);
  628. return cb({'status': 'failure', 'message': err});
  629. }
  630. logger.success("STATIONS_UPDATE_GENRES", `Updated station "${stationId}" genres to "${newGenres}" successfully.`);
  631. return cb({'status': 'success', 'message': 'Successfully updated the genres.'});
  632. });
  633. }),
  634. /**
  635. * Updates a station's blacklisted genres
  636. *
  637. * @param session
  638. * @param stationId - the station id
  639. * @param newBlacklistedGenres - the new station blacklisted genres
  640. * @param cb
  641. */
  642. updateBlacklistedGenres: hooks.ownerRequired((session, stationId, newBlacklistedGenres, cb) => {
  643. async.waterfall([
  644. (next) => {
  645. db.models.station.updateOne({_id: stationId}, {$set: {blacklistedGenres: newBlacklistedGenres}}, {runValidators: true}, next);
  646. },
  647. (res, next) => {
  648. stations.updateStation(stationId, next);
  649. }
  650. ], async (err) => {
  651. if (err) {
  652. err = await utils.getError(err);
  653. logger.error("STATIONS_UPDATE_BLACKLISTED_GENRES", `Updating station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" failed. "${err}"`);
  654. return cb({'status': 'failure', 'message': err});
  655. }
  656. logger.success("STATIONS_UPDATE_BLACKLISTED_GENRES", `Updated station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" successfully.`);
  657. return cb({'status': 'success', 'message': 'Successfully updated the blacklisted genres.'});
  658. });
  659. }),
  660. /**
  661. * Updates a station's party mode
  662. *
  663. * @param session
  664. * @param stationId - the station id
  665. * @param newPartyMode - the new station party mode
  666. * @param cb
  667. */
  668. updatePartyMode: hooks.ownerRequired((session, stationId, newPartyMode, cb) => {
  669. async.waterfall([
  670. (next) => {
  671. stations.getStation(stationId, next);
  672. },
  673. (station, next) => {
  674. if (!station) return next('Station not found.');
  675. if (station.partyMode === newPartyMode) return next('The party mode was already ' + ((newPartyMode) ? 'enabled.' : 'disabled.'));
  676. db.models.station.updateOne({_id: stationId}, {$set: {partyMode: newPartyMode}}, {runValidators: true}, next);
  677. },
  678. (res, next) => {
  679. stations.updateStation(stationId, next);
  680. }
  681. ], async (err) => {
  682. if (err) {
  683. err = await utils.getError(err);
  684. logger.error("STATIONS_UPDATE_PARTY_MODE", `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`);
  685. return cb({'status': 'failure', 'message': err});
  686. }
  687. logger.success("STATIONS_UPDATE_PARTY_MODE", `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`);
  688. cache.pub('station.updatePartyMode', {stationId: stationId, partyMode: newPartyMode});
  689. stations.skipStation(stationId)();
  690. return cb({'status': 'success', 'message': 'Successfully updated the party mode.'});
  691. });
  692. }),
  693. /**
  694. * Pauses a station
  695. *
  696. * @param session
  697. * @param stationId - the station id
  698. * @param cb
  699. */
  700. pause: hooks.ownerRequired((session, stationId, cb) => {
  701. async.waterfall([
  702. (next) => {
  703. stations.getStation(stationId, next);
  704. },
  705. (station, next) => {
  706. if (!station) return next('Station not found.');
  707. if (station.paused) return next('That station was already paused.');
  708. db.models.station.updateOne({_id: stationId}, {$set: {paused: true, pausedAt: Date.now()}}, next);
  709. },
  710. (res, next) => {
  711. stations.updateStation(stationId, next);
  712. }
  713. ], async (err) => {
  714. if (err) {
  715. err = await utils.getError(err);
  716. logger.error("STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  717. return cb({'status': 'failure', 'message': err});
  718. }
  719. logger.success("STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  720. cache.pub('station.pause', stationId);
  721. notifications.unschedule(`stations.nextSong?id=${stationId}`);
  722. return cb({'status': 'success', 'message': 'Successfully paused.'});
  723. });
  724. }),
  725. /**
  726. * Resumes a station
  727. *
  728. * @param session
  729. * @param stationId - the station id
  730. * @param cb
  731. */
  732. resume: hooks.ownerRequired((session, stationId, cb) => {
  733. async.waterfall([
  734. (next) => {
  735. stations.getStation(stationId, next);
  736. },
  737. (station, next) => {
  738. if (!station) return next('Station not found.');
  739. if (!station.paused) return next('That station is not paused.');
  740. station.timePaused += (Date.now() - station.pausedAt);
  741. db.models.station.updateOne({_id: stationId}, {$set: {paused: false}, $inc: {timePaused: Date.now() - station.pausedAt}}, next);
  742. },
  743. (res, next) => {
  744. stations.updateStation(stationId, next);
  745. }
  746. ], async (err) => {
  747. if (err) {
  748. err = await utils.getError(err);
  749. logger.error("STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  750. return cb({'status': 'failure', 'message': err});
  751. }
  752. logger.success("STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  753. cache.pub('station.resume', stationId);
  754. return cb({'status': 'success', 'message': 'Successfully resumed.'});
  755. });
  756. }),
  757. /**
  758. * Removes a station
  759. *
  760. * @param session
  761. * @param stationId - the station id
  762. * @param cb
  763. */
  764. remove: hooks.ownerRequired((session, stationId, cb) => {
  765. async.waterfall([
  766. (next) => {
  767. db.models.station.deleteOne({ _id: stationId }, err => next(err));
  768. },
  769. (next) => {
  770. cache.hdel('stations', stationId, err => next(err));
  771. }
  772. ], async (err) => {
  773. if (err) {
  774. err = await utils.getError(err);
  775. logger.error("STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  776. return cb({ 'status': 'failure', 'message': err });
  777. }
  778. logger.success("STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  779. cache.pub('station.remove', stationId);
  780. return cb({ 'status': 'success', 'message': 'Successfully removed.' });
  781. });
  782. }),
  783. /**
  784. * Create a station
  785. *
  786. * @param session
  787. * @param data - the station data
  788. * @param cb
  789. * @param userId
  790. */
  791. create: hooks.loginRequired((session, data, cb, userId) => {
  792. data.name = data.name.toLowerCase();
  793. let blacklist = ["country", "edm", "musare", "hip-hop", "rap", "top-hits", "todays-hits", "old-school", "christmas", "about", "support", "staff", "help", "news", "terms", "privacy", "profile", "c", "community", "tos", "login", "register", "p", "official", "o", "trap", "faq", "team", "donate", "buy", "shop", "forums", "explore", "settings", "admin", "auth", "reset_password"];
  794. async.waterfall([
  795. (next) => {
  796. if (!data) return next('Invalid data.');
  797. next();
  798. },
  799. (next) => {
  800. db.models.station.findOne({ $or: [{name: data.name}, {displayName: new RegExp(`^${data.displayName}$`, 'i')}] }, next);
  801. },
  802. (station, next) => {
  803. if (station) return next('A station with that name or display name already exists.');
  804. const { name, displayName, description, genres, playlist, type, blacklistedGenres } = data;
  805. if (type === 'official') {
  806. db.models.user.findOne({_id: userId}, (err, user) => {
  807. if (err) return next(err);
  808. if (!user) return next('User not found.');
  809. if (user.role !== 'admin') return next('Admin required.');
  810. db.models.station.create({
  811. name,
  812. displayName,
  813. description,
  814. type,
  815. privacy: 'private',
  816. playlist,
  817. genres,
  818. blacklistedGenres,
  819. currentSong: stations.defaultSong
  820. }, next);
  821. });
  822. } else if (type === 'community') {
  823. if (blacklist.indexOf(name) !== -1) return next('That name is blacklisted. Please use a different name.');
  824. db.models.station.create({
  825. name,
  826. displayName,
  827. description,
  828. type,
  829. privacy: 'private',
  830. owner: userId,
  831. queue: [],
  832. currentSong: null
  833. }, next);
  834. }
  835. }
  836. ], async (err, station) => {
  837. if (err) {
  838. err = await utils.getError(err);
  839. logger.error("STATIONS_CREATE", `Creating station failed. "${err}"`);
  840. return cb({'status': 'failure', 'message': err});
  841. }
  842. logger.success("STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  843. cache.pub('station.create', station._id);
  844. return cb({'status': 'success', 'message': 'Successfully created station.'});
  845. });
  846. }),
  847. /**
  848. * Adds song to station queue
  849. *
  850. * @param session
  851. * @param stationId - the station id
  852. * @param songId - the song id
  853. * @param cb
  854. * @param userId
  855. */
  856. addToQueue: hooks.loginRequired((session, stationId, songId, cb, userId) => {
  857. async.waterfall([
  858. (next) => {
  859. stations.getStation(stationId, next);
  860. },
  861. (station, next) => {
  862. if (!station) return next('Station not found.');
  863. if (station.locked) {
  864. db.models.user.findOne({ _id: userId }, (err, user) => {
  865. if (user.role !== 'admin' && station.owner !== userId) return next('Only owners and admins can add songs to a locked queue.');
  866. else return next(null, station);
  867. });
  868. } else {
  869. return next(null, station);
  870. }
  871. },
  872. (station, next) => {
  873. if (station.type !== 'community') return next('That station is not a community station.');
  874. utils.canUserBeInStation(station, userId, (canBe) => {
  875. if (canBe) return next(null, station);
  876. return next('Insufficient permissions.');
  877. });
  878. },
  879. (station, next) => {
  880. if (station.currentSong && station.currentSong.songId === songId) return next('That song is currently playing.');
  881. async.each(station.queue, (queueSong, next) => {
  882. if (queueSong.songId === songId) return next('That song is already in the queue.');
  883. next();
  884. }, (err) => {
  885. next(err, station);
  886. });
  887. },
  888. (station, next) => {
  889. songs.getSong(songId, (err, song) => {
  890. if (!err && song) return next(null, song, station);
  891. utils.getSongFromYouTube(songId, (song) => {
  892. song.artists = [];
  893. song.skipDuration = 0;
  894. song.likes = -1;
  895. song.dislikes = -1;
  896. song.thumbnail = "empty";
  897. song.explicit = false;
  898. next(null, song, station);
  899. });
  900. });
  901. },
  902. (song, station, next) => {
  903. let queue = station.queue;
  904. song.requestedBy = userId;
  905. queue.push(song);
  906. let totalDuration = 0;
  907. queue.forEach((song) => {
  908. totalDuration += song.duration;
  909. });
  910. if (totalDuration >= 3600 * 3) return next('The max length of the queue is 3 hours.');
  911. next(null, song, station);
  912. },
  913. (song, station, next) => {
  914. let queue = station.queue;
  915. if (queue.length === 0) return next(null, song, station);
  916. let totalDuration = 0;
  917. const userId = queue[queue.length - 1].requestedBy;
  918. station.queue.forEach((song) => {
  919. if (userId === song.requestedBy) {
  920. totalDuration += song.duration;
  921. }
  922. });
  923. if(totalDuration >= 900) return next('The max length of songs per user is 15 minutes.');
  924. next(null, song, station);
  925. },
  926. (song, station, next) => {
  927. let queue = station.queue;
  928. if (queue.length === 0) return next(null, song);
  929. let totalSongs = 0;
  930. const userId = queue[queue.length - 1].requestedBy;
  931. queue.forEach((song) => {
  932. if (userId === song.requestedBy) {
  933. totalSongs++;
  934. }
  935. });
  936. if (totalSongs <= 2) return next(null, song);
  937. if (totalSongs > 3) return next('The max amount of songs per user is 3, and only 2 in a row is allowed.');
  938. if (queue[queue.length - 2].requestedBy !== userId || queue[queue.length - 3] !== userId) return next('The max amount of songs per user is 3, and only 2 in a row is allowed.');
  939. next(null, song);
  940. },
  941. (song, next) => {
  942. db.models.station.updateOne({_id: stationId}, {$push: {queue: song}}, {runValidators: true}, next);
  943. },
  944. (res, next) => {
  945. stations.updateStation(stationId, next);
  946. }
  947. ], async (err, station) => {
  948. if (err) {
  949. err = await utils.getError(err);
  950. logger.error("STATIONS_ADD_SONG_TO_QUEUE", `Adding song "${songId}" to station "${stationId}" queue failed. "${err}"`);
  951. return cb({'status': 'failure', 'message': err});
  952. }
  953. logger.success("STATIONS_ADD_SONG_TO_QUEUE", `Added song "${songId}" to station "${stationId}" successfully.`);
  954. cache.pub('station.queueUpdate', stationId);
  955. return cb({'status': 'success', 'message': 'Successfully added song to queue.'});
  956. });
  957. }),
  958. /**
  959. * Removes song from station queue
  960. *
  961. * @param session
  962. * @param stationId - the station id
  963. * @param songId - the song id
  964. * @param cb
  965. * @param userId
  966. */
  967. removeFromQueue: hooks.ownerRequired((session, stationId, songId, cb, userId) => {
  968. async.waterfall([
  969. (next) => {
  970. if (!songId) return next('Invalid song id.');
  971. stations.getStation(stationId, next);
  972. },
  973. (station, next) => {
  974. if (!station) return next('Station not found.');
  975. if (station.type !== 'community') return next('Station is not a community station.');
  976. async.each(station.queue, (queueSong, next) => {
  977. if (queueSong.songId === songId) return next(true);
  978. next();
  979. }, (err) => {
  980. if (err === true) return next();
  981. next('Song is not currently in the queue.');
  982. });
  983. },
  984. (next) => {
  985. db.models.station.updateOne({_id: stationId}, {$pull: {queue: {songId: songId}}}, next);
  986. },
  987. (res, next) => {
  988. stations.updateStation(stationId, next);
  989. }
  990. ], async (err, station) => {
  991. if (err) {
  992. err = await utils.getError(err);
  993. logger.error("STATIONS_REMOVE_SONG_TO_QUEUE", `Removing song "${songId}" from station "${stationId}" queue failed. "${err}"`);
  994. return cb({'status': 'failure', 'message': err});
  995. }
  996. logger.success("STATIONS_REMOVE_SONG_TO_QUEUE", `Removed song "${songId}" from station "${stationId}" successfully.`);
  997. cache.pub('station.queueUpdate', stationId);
  998. return cb({'status': 'success', 'message': 'Successfully removed song from queue.'});
  999. });
  1000. }),
  1001. /**
  1002. * Gets the queue from a station
  1003. *
  1004. * @param session
  1005. * @param stationId - the station id
  1006. * @param cb
  1007. */
  1008. getQueue: (session, stationId, cb) => {
  1009. async.waterfall([
  1010. (next) => {
  1011. stations.getStation(stationId, next);
  1012. },
  1013. (station, next) => {
  1014. if (!station) return next('Station not found.');
  1015. if (station.type !== 'community') return next('Station is not a community station.');
  1016. next(null, station);
  1017. },
  1018. (station, next) => {
  1019. utils.canUserBeInStation(station, session.userId, (canBe) => {
  1020. if (canBe) return next(null, station);
  1021. return next('Insufficient permissions.');
  1022. });
  1023. }
  1024. ], async (err, station) => {
  1025. if (err) {
  1026. err = await utils.getError(err);
  1027. logger.error("STATIONS_GET_QUEUE", `Getting queue for station "${stationId}" failed. "${err}"`);
  1028. return cb({'status': 'failure', 'message': err});
  1029. }
  1030. logger.success("STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  1031. return cb({'status': 'success', 'message': 'Successfully got queue.', queue: station.queue});
  1032. });
  1033. },
  1034. /**
  1035. * Selects a private playlist for a station
  1036. *
  1037. * @param session
  1038. * @param stationId - the station id
  1039. * @param playlistId - the private playlist id
  1040. * @param cb
  1041. * @param userId
  1042. */
  1043. selectPrivatePlaylist: hooks.ownerRequired((session, stationId, playlistId, cb, userId) => {
  1044. async.waterfall([
  1045. (next) => {
  1046. stations.getStation(stationId, next);
  1047. },
  1048. (station, next) => {
  1049. if (!station) return next('Station not found.');
  1050. if (station.type !== 'community') return next('Station is not a community station.');
  1051. if (station.privatePlaylist === playlistId) return next('That private playlist is already selected.');
  1052. db.models.playlist.findOne({_id: playlistId}, next);
  1053. },
  1054. (playlist, next) => {
  1055. if (!playlist) return next('Playlist not found.');
  1056. let currentSongIndex = (playlist.songs.length > 0) ? playlist.songs.length - 1 : 0;
  1057. db.models.station.updateOne({_id: stationId}, {$set: {privatePlaylist: playlistId, currentSongIndex: currentSongIndex}}, {runValidators: true}, next);
  1058. },
  1059. (res, next) => {
  1060. stations.updateStation(stationId, next);
  1061. }
  1062. ], async (err, station) => {
  1063. if (err) {
  1064. err = await utils.getError(err);
  1065. logger.error("STATIONS_SELECT_PRIVATE_PLAYLIST", `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`);
  1066. return cb({'status': 'failure', 'message': err});
  1067. }
  1068. logger.success("STATIONS_SELECT_PRIVATE_PLAYLIST", `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`);
  1069. notifications.unschedule(`stations.nextSong?id${stationId}`);
  1070. if (!station.partyMode) stations.skipStation(stationId)();
  1071. cache.pub('privatePlaylist.selected', {playlistId, stationId});
  1072. return cb({'status': 'success', 'message': 'Successfully selected playlist.'});
  1073. });
  1074. }),
  1075. favoriteStation: hooks.loginRequired((session, stationId, cb, userId) => {
  1076. async.waterfall([
  1077. (next) => {
  1078. stations.getStation(stationId, next);
  1079. },
  1080. (station, next) => {
  1081. if (!station) return next('Station not found.');
  1082. async.waterfall([
  1083. (next) => {
  1084. if (station.privacy !== 'private') return next(true);
  1085. if (!session.userId) return next("You're not allowed to favorite this station.");
  1086. next();
  1087. },
  1088. (next) => {
  1089. db.models.user.findOne({ _id: userId }, next);
  1090. },
  1091. (user, next) => {
  1092. if (!user) return next("You're not allowed to favorite this station.");
  1093. if (user.role === 'admin') return next(true);
  1094. if (station.type === 'official') return next("You're not allowed to favorite this station.");
  1095. if (station.owner === session.userId) return next(true);
  1096. next("You're not allowed to favorite this station.");
  1097. }
  1098. ], (err) => {
  1099. if (err === true) return next(null);
  1100. next(utils.getError(err));
  1101. });
  1102. },
  1103. (next) => {
  1104. db.models.user.updateOne({ _id: userId }, { $addToSet: { favoriteStations: stationId } }, next);
  1105. },
  1106. (res, next) => {
  1107. if (res.nModified === 0) return next("The station was already favorited.");
  1108. next();
  1109. }
  1110. ], async (err) => {
  1111. if (err) {
  1112. err = await utils.getError(err);
  1113. logger.error("FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  1114. return cb({'status': 'failure', 'message': err});
  1115. }
  1116. logger.success("FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  1117. cache.pub('user.favoritedStation', { userId, stationId });
  1118. return cb({'status': 'success', 'message': 'Succesfully favorited station.'});
  1119. });
  1120. }),
  1121. unfavoriteStation: hooks.loginRequired((session, stationId, cb, userId) => {
  1122. async.waterfall([
  1123. (next) => {
  1124. db.models.user.updateOne({ _id: userId }, { $pull: { favoriteStations: stationId } }, next);
  1125. },
  1126. (res, next) => {
  1127. if (res.nModified === 0) return next("The station wasn't favorited.");
  1128. next();
  1129. }
  1130. ], async (err) => {
  1131. if (err) {
  1132. err = await utils.getError(err);
  1133. logger.error("UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  1134. return cb({'status': 'failure', 'message': err});
  1135. }
  1136. logger.success("UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  1137. cache.pub('user.unfavoritedStation', { userId, stationId });
  1138. return cb({'status': 'success', 'message': 'Succesfully unfavorited station.'});
  1139. });
  1140. }),
  1141. };