stations.js 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071
  1. 'use strict';
  2. const async = require('async'),
  3. request = require('request'),
  4. config = require('config'),
  5. _ = require('underscore')._;
  6. const io = require('../io');
  7. const db = require('../db');
  8. const cache = require('../cache');
  9. const notifications = require('../notifications');
  10. const utils = require('../utils');
  11. const logger = require('../logger');
  12. const stations = require('../stations');
  13. const songs = require('../songs');
  14. const hooks = require('./hooks');
  15. let userList = {};
  16. let usersPerStation = {};
  17. let usersPerStationCount = {};
  18. setInterval(() => {
  19. let stationsCountUpdated = [];
  20. let stationsUpdated = [];
  21. let oldUsersPerStation = usersPerStation;
  22. usersPerStation = {};
  23. let oldUsersPerStationCount = usersPerStationCount;
  24. usersPerStationCount = {};
  25. async.each(Object.keys(userList), function(socketId, next) {
  26. let socket = utils.socketFromSession(socketId);
  27. let stationId = userList[socketId];
  28. if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
  29. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  30. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  31. delete userList[socketId];
  32. return next();
  33. }
  34. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
  35. usersPerStationCount[stationId]++;
  36. if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
  37. async.waterfall([
  38. (next) => {
  39. if (!socket.session || !socket.session.sessionId) return next('No session found.');
  40. cache.hget('sessions', socket.session.sessionId, next);
  41. },
  42. (session, next) => {
  43. if (!session) return next('Session not found.');
  44. db.models.user.findOne({_id: session.userId}, next);
  45. },
  46. (user, next) => {
  47. if (!user) return next('User not found.');
  48. if (usersPerStation[stationId].indexOf(user.username) !== -1) return next('User already in the list.');
  49. next(null, user.username);
  50. }
  51. ], (err, username) => {
  52. if (!err) {
  53. usersPerStation[stationId].push(username);
  54. }
  55. next();
  56. });
  57. //TODO Code to show users
  58. }, (err) => {
  59. for (let stationId in usersPerStationCount) {
  60. if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
  61. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  62. }
  63. }
  64. for (let stationId in usersPerStation) {
  65. if (_.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 || _.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0) {
  66. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  67. }
  68. }
  69. stationsCountUpdated.forEach((stationId) => {
  70. console.log("Updating count of ", stationId);
  71. cache.pub('station.updateUserCount', stationId);
  72. });
  73. stationsUpdated.forEach((stationId) => {
  74. console.log("Updating ", stationId);
  75. cache.pub('station.updateUsers', stationId);
  76. });
  77. //console.log("Userlist", usersPerStation);
  78. });
  79. }, 3000);
  80. cache.sub('station.updateUsers', stationId => {
  81. let list = usersPerStation[stationId] || [];
  82. utils.emitToRoom(`station.${stationId}`, "event:users.updated", list);
  83. });
  84. cache.sub('station.updateUserCount', stationId => {
  85. let count = usersPerStationCount[stationId] || 0;
  86. utils.emitToRoom(`station.${stationId}`, "event:userCount.updated", count);
  87. stations.getStation(stationId, (err, station) => {
  88. if (station.privacy === 'public') utils.emitToRoom('home', "event:userCount.updated", stationId, count);
  89. else {
  90. let sockets = utils.getRoomSockets('home');
  91. for (let socketId in sockets) {
  92. let socket = sockets[socketId];
  93. let session = sockets[socketId].session;
  94. if (session.sessionId) {
  95. cache.hget('sessions', session.sessionId, (err, session) => {
  96. if (!err && session) {
  97. db.models.user.findOne({_id: session.userId}, (err, user) => {
  98. if (user.role === 'admin') socket.emit("event:userCount.updated", stationId, count);
  99. else if (station.type === "community" && station.owner === session.userId) socket.emit("event:userCount.updated", stationId, count);
  100. });
  101. }
  102. });
  103. }
  104. }
  105. }
  106. })
  107. });
  108. cache.sub('station.updatePartyMode', data => {
  109. utils.emitToRoom(`station.${data.stationId}`, "event:partyMode.updated", data.partyMode);
  110. });
  111. cache.sub('privatePlaylist.selected', data => {
  112. utils.emitToRoom(`station.${data.stationId}`, "event:privatePlaylist.selected", data.playlistId);
  113. });
  114. cache.sub('station.pause', stationId => {
  115. utils.emitToRoom(`station.${stationId}`, "event:stations.pause");
  116. });
  117. cache.sub('station.resume', stationId => {
  118. stations.getStation(stationId, (err, station) => {
  119. utils.emitToRoom(`station.${stationId}`, "event:stations.resume", { timePaused: station.timePaused });
  120. });
  121. });
  122. cache.sub('station.queueUpdate', stationId => {
  123. stations.getStation(stationId, (err, station) => {
  124. if (!err) utils.emitToRoom(`station.${stationId}`, "event:queue.update", station.queue);
  125. });
  126. });
  127. cache.sub('station.voteSkipSong', stationId => {
  128. utils.emitToRoom(`station.${stationId}`, "event:song.voteSkipSong");
  129. });
  130. cache.sub('station.remove', stationId => {
  131. utils.emitToRoom(`station.${stationId}`, 'event:stations.remove');
  132. utils.emitToRoom('admin.stations', 'event:admin.station.removed', stationId);
  133. });
  134. cache.sub('station.create', stationId => {
  135. stations.initializeStation(stationId, (err, station) => {
  136. station.userCount = usersPerStationCount[stationId] || 0;
  137. if (err) console.error(err);
  138. utils.emitToRoom('admin.stations', 'event:admin.station.added', station);
  139. // TODO If community, check if on whitelist
  140. if (station.privacy === 'public') utils.emitToRoom('home', "event:stations.created", station);
  141. else {
  142. let sockets = utils.getRoomSockets('home');
  143. for (let socketId in sockets) {
  144. let socket = sockets[socketId];
  145. let session = sockets[socketId].session;
  146. if (session.sessionId) {
  147. cache.hget('sessions', session.sessionId, (err, session) => {
  148. if (!err && session) {
  149. db.models.user.findOne({_id: session.userId}, (err, user) => {
  150. if (user.role === 'admin') socket.emit("event:stations.created", station);
  151. else if (station.type === "community" && station.owner === session.userId) socket.emit("event:stations.created", station);
  152. });
  153. }
  154. });
  155. }
  156. }
  157. }
  158. });
  159. });
  160. module.exports = {
  161. /**
  162. * Get a list of all the stations
  163. *
  164. * @param session
  165. * @param cb
  166. * @return {{ status: String, stations: Array }}
  167. */
  168. index: (session, cb) => {
  169. async.waterfall([
  170. (next) => {
  171. cache.hgetall('stations', next);
  172. },
  173. (stations, next) => {
  174. let resultStations = [];
  175. for (let id in stations) {
  176. resultStations.push(stations[id]);
  177. }
  178. next(null, stations);
  179. },
  180. (stations, next) => {
  181. let resultStations = [];
  182. async.each(stations, (station, next) => {
  183. async.waterfall([
  184. (next) => {
  185. if (station.privacy === 'public') return next(true);
  186. if (!session.sessionId) return next(`Insufficient permissions.`);
  187. cache.hget('sessions', session.sessionId, next);
  188. },
  189. (session, next) => {
  190. if (!session) return next(`Insufficient permissions.`);
  191. db.models.user.findOne({_id: session.userId}, next);
  192. },
  193. (user, next) => {
  194. if (!user) return next(`Insufficient permissions.`);
  195. if (user.role === 'admin') return next(true);
  196. if (station.type === 'official') return next(`Insufficient permissions.`);
  197. if (station.owner === session.userId) return next(true);
  198. next(`Insufficient permissions.`);
  199. }
  200. ], (err) => {
  201. station.userCount = usersPerStationCount[station._id] || 0;
  202. if (err === true) resultStations.push(station);
  203. next();
  204. });
  205. }, () => {
  206. next(null, resultStations);
  207. });
  208. }
  209. ], (err, stations) => {
  210. if (err) {
  211. err = utils.getError(err);
  212. logger.error("STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  213. return cb({'status': 'failure', 'message': err});
  214. }
  215. logger.success("STATIONS_INDEX", `Indexing stations successful.`);
  216. return cb({'status': 'success', 'stations': stations});
  217. });
  218. },
  219. /**
  220. * Finds a station by name
  221. *
  222. * @param session
  223. * @param stationName - the station name
  224. * @param cb
  225. */
  226. findByName: (session, stationName, cb) => {
  227. async.waterfall([
  228. (next) => {
  229. stations.getStationByName(stationName, next);
  230. },
  231. (station, next) => {
  232. if (!station) return next('Station not found.');
  233. next(null, station);
  234. }
  235. ], (err, station) => {
  236. if (err) {
  237. err = utils.getError(err);
  238. logger.error("STATIONS_FIND_BY_NAME", `Finding station "${stationName}" failed. "${err}"`);
  239. return cb({'status': 'failure', 'message': err});
  240. }
  241. logger.success("STATIONS_FIND_BY_NAME", `Found station "${stationName}" successfully.`);
  242. cb({status: 'success', data: station});
  243. });
  244. },
  245. /**
  246. * Gets the official playlist for a station
  247. *
  248. * @param session
  249. * @param stationId - the station id
  250. * @param cb
  251. */
  252. getPlaylist: (session, stationId, cb) => {
  253. async.waterfall([
  254. (next) => {
  255. stations.getStation(stationId, next);
  256. },
  257. (station, next) => {
  258. if (!station) return next('Station not found.');
  259. else if (station.type !== 'official') return next('This is not an official station.');
  260. else next();
  261. },
  262. (next) => {
  263. cache.hget('officialPlaylists', stationId, next);
  264. },
  265. (playlist, next) => {
  266. if (!playlist) return next('Playlist not found.');
  267. next(null, playlist);
  268. }
  269. ], (err, playlist) => {
  270. if (err) {
  271. err = utils.getError(err);
  272. logger.error("STATIONS_GET_PLAYLIST", `Getting playlist for station "${stationId}" failed. "${err}"`);
  273. return cb({ status: 'failure', message: err });
  274. } else {
  275. logger.success("STATIONS_GET_PLAYLIST", `Got playlist for station "${stationId}" successfully.`);
  276. cb({ status: 'success', data: playlist.songs });
  277. }
  278. });
  279. },
  280. /**
  281. * Joins the station by its name
  282. *
  283. * @param session
  284. * @param stationName - the station name
  285. * @param cb
  286. * @return {{ status: String, userCount: Integer }}
  287. */
  288. join: (session, stationName, cb) => {
  289. async.waterfall([
  290. (next) => {
  291. stations.getStationByName(stationName, next);
  292. },
  293. (station, next) => {
  294. if (!station) return next('Station not found.');
  295. async.waterfall([
  296. (next) => {
  297. if (station.privacy !== 'private') return next(true);
  298. if (!session.userId) return next('An error occurred while joining the station.');
  299. next();
  300. },
  301. (next) => {
  302. db.models.user.findOne({_id: session.userId}, next);
  303. },
  304. (user, next) => {
  305. if (!user) return next('An error occurred while joining the station.');
  306. if (user.role === 'admin') return next(true);
  307. if (station.type === 'official') return next('An error occurred while joining the station.');
  308. if (station.owner === session.userId) return next(true);
  309. next('An error occurred while joining the station.');
  310. }
  311. ], (err) => {
  312. if (err === true) return next(null, station);
  313. next(utils.getError(err));
  314. });
  315. },
  316. (station, next) => {
  317. utils.socketJoinRoom(session.socketId, `station.${station._id}`);
  318. let data = {
  319. _id: station._id,
  320. type: station.type,
  321. currentSong: station.currentSong,
  322. startedAt: station.startedAt,
  323. paused: station.paused,
  324. timePaused: station.timePaused,
  325. description: station.description,
  326. displayName: station.displayName,
  327. privacy: station.privacy,
  328. partyMode: station.partyMode,
  329. owner: station.owner,
  330. privatePlaylist: station.privatePlaylist
  331. };
  332. userList[session.socketId] = station._id;
  333. next(null, data);
  334. },
  335. (data, next) => {
  336. data.userCount = usersPerStationCount[data._id] || 0;
  337. data.users = usersPerStation[data._id] || [];
  338. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  339. utils.socketJoinSongRoom(session.socketId, `song.${data.currentSong.songId}`);
  340. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  341. songs.getSongFromId(data.currentSong.songId, (err, song) => {
  342. if (!err && song) {
  343. data.currentSong.likes = song.likes;
  344. data.currentSong.dislikes = song.dislikes;
  345. } else {
  346. data.currentSong.likes = -1;
  347. data.currentSong.dislikes = -1;
  348. }
  349. next(null, data);
  350. });
  351. }
  352. ], (err, data) => {
  353. if (err) {
  354. err = utils.getError(err);
  355. logger.error("STATIONS_JOIN", `Joining station "${stationName}" failed. "${err}"`);
  356. return cb({'status': 'failure', 'message': err});
  357. }
  358. logger.success("STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  359. cb({status: 'success', data});
  360. });
  361. },
  362. /**
  363. * Votes to skip a station
  364. *
  365. * @param session
  366. * @param stationId - the station id
  367. * @param cb
  368. * @param userId
  369. */
  370. voteSkip: hooks.loginRequired((session, stationId, cb, userId) => {
  371. async.waterfall([
  372. (next) => {
  373. stations.getStation(stationId, next);
  374. },
  375. (station, next) => {
  376. if (!station) return next('Station not found.');
  377. utils.canUserBeInStation(station, userId, (canBe) => {
  378. if (canBe) return next(null, station);
  379. return next('Insufficient permissions.');
  380. });
  381. },
  382. (station, next) => {
  383. if (!station.currentSong) return next('There is currently no song to skip.');
  384. if (station.currentSong.skipVotes.indexOf(userId) !== -1) return next('You have already voted to skip this song.');
  385. next(null, station);
  386. },
  387. (station, next) => {
  388. db.models.station.update({_id: stationId}, {$push: {"currentSong.skipVotes": userId}}, next)
  389. },
  390. (res, next) => {
  391. stations.updateStation(stationId, next);
  392. },
  393. (station, next) => {
  394. if (!station) return next('Station not found.');
  395. next(null, station);
  396. }
  397. ], (err, station) => {
  398. if (err) {
  399. err = utils.getError(err);
  400. logger.error("STATIONS_VOTE_SKIP", `Vote skipping station "${stationId}" failed. "${err}"`);
  401. return cb({'status': 'failure', 'message': err});
  402. }
  403. logger.success("STATIONS_VOTE_SKIP", `Vote skipping "${stationId}" successful.`);
  404. cache.pub('station.voteSkipSong', stationId);
  405. if (station.currentSong && station.currentSong.skipVotes.length >= 3) stations.skipStation(stationId)();
  406. cb({ status: 'success', message: 'Successfully voted to skip the song.' });
  407. });
  408. }),
  409. /**
  410. * Force skips a station
  411. *
  412. * @param session
  413. * @param stationId - the station id
  414. * @param cb
  415. */
  416. forceSkip: hooks.ownerRequired((session, stationId, cb) => {
  417. async.waterfall([
  418. (next) => {
  419. stations.getStation(stationId, next);
  420. },
  421. (station, next) => {
  422. if (!station) return next('Station not found.');
  423. next();
  424. }
  425. ], (err) => {
  426. if (err) {
  427. err = utils.getError(err);
  428. logger.error("STATIONS_FORCE_SKIP", `Force skipping station "${stationId}" failed. "${err}"`);
  429. return cb({'status': 'failure', 'message': err});
  430. }
  431. notifications.unschedule(`stations.nextSong?id=${stationId}`);
  432. stations.skipStation(stationId)();
  433. logger.success("STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  434. return cb({'status': 'success', 'message': 'Successfully skipped station.'});
  435. });
  436. }),
  437. /**
  438. * Leaves the user's current station
  439. *
  440. * @param session
  441. * @param stationId
  442. * @param cb
  443. * @return {{ status: String, userCount: Integer }}
  444. */
  445. leave: (session, stationId, cb) => {
  446. async.waterfall([
  447. (next) => {
  448. stations.getStation(stationId, next);
  449. },
  450. (station, next) => {
  451. if (!station) return next('Station not found.');
  452. next();
  453. }
  454. ], (err, userCount) => {
  455. if (err) {
  456. err = utils.getError(err);
  457. logger.error("STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  458. return cb({'status': 'failure', 'message': err});
  459. }
  460. logger.success("STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  461. utils.socketLeaveRooms(session);
  462. delete userList[session.socketId];
  463. return cb({'status': 'success', 'message': 'Successfully left station.', userCount});
  464. });
  465. },
  466. /**
  467. * Updates a station's name
  468. *
  469. * @param session
  470. * @param stationId - the station id
  471. * @param newName - the new station name
  472. * @param cb
  473. */
  474. updateName: hooks.ownerRequired((session, stationId, newName, cb) => {
  475. async.waterfall([
  476. (next) => {
  477. db.models.station.update({_id: stationId}, {$set: {name: newName}}, {runValidators: true}, next);
  478. },
  479. (res, next) => {
  480. stations.updateStation(stationId, next);
  481. }
  482. ], (err) => {
  483. if (err) {
  484. err = utils.getError(err);
  485. logger.error("STATIONS_UPDATE_DISPLAY_NAME", `Updating station "${stationId}" displayName to "${newName}" failed. "${err}"`);
  486. return cb({'status': 'failure', 'message': err});
  487. }
  488. logger.success("STATIONS_UPDATE_DISPLAY_NAME", `Updated station "${stationId}" displayName to "${newName}" successfully.`);
  489. return cb({'status': 'success', 'message': 'Successfully updated the name.'});
  490. });
  491. }),
  492. /**
  493. * Updates a station's display name
  494. *
  495. * @param session
  496. * @param stationId - the station id
  497. * @param newDisplayName - the new station display name
  498. * @param cb
  499. */
  500. updateDisplayName: hooks.ownerRequired((session, stationId, newDisplayName, cb) => {
  501. async.waterfall([
  502. (next) => {
  503. db.models.station.update({_id: stationId}, {$set: {displayName: newDisplayName}}, {runValidators: true}, next);
  504. },
  505. (res, next) => {
  506. stations.updateStation(stationId, next);
  507. }
  508. ], (err) => {
  509. if (err) {
  510. err = utils.getError(err);
  511. logger.error("STATIONS_UPDATE_DISPLAY_NAME", `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`);
  512. return cb({'status': 'failure', 'message': err});
  513. }
  514. logger.success("STATIONS_UPDATE_DISPLAY_NAME", `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`);
  515. return cb({'status': 'success', 'message': 'Successfully updated the display name.'});
  516. });
  517. }),
  518. /**
  519. * Updates a station's description
  520. *
  521. * @param session
  522. * @param stationId - the station id
  523. * @param newDescription - the new station description
  524. * @param cb
  525. */
  526. updateDescription: hooks.ownerRequired((session, stationId, newDescription, cb) => {
  527. async.waterfall([
  528. (next) => {
  529. db.models.station.update({_id: stationId}, {$set: {description: newDescription}}, {runValidators: true}, next);
  530. },
  531. (res, next) => {
  532. stations.updateStation(stationId, next);
  533. }
  534. ], (err) => {
  535. if (err) {
  536. err = utils.getError(err);
  537. logger.error("STATIONS_UPDATE_DESCRIPTION", `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`);
  538. return cb({'status': 'failure', 'message': err});
  539. }
  540. logger.success("STATIONS_UPDATE_DESCRIPTION", `Updated station "${stationId}" description to "${newDescription}" successfully.`);
  541. return cb({'status': 'success', 'message': 'Successfully updated the description.'});
  542. });
  543. }),
  544. /**
  545. * Updates a station's privacy
  546. *
  547. * @param session
  548. * @param stationId - the station id
  549. * @param newPrivacy - the new station privacy
  550. * @param cb
  551. */
  552. updatePrivacy: hooks.ownerRequired((session, stationId, newPrivacy, cb) => {
  553. async.waterfall([
  554. (next) => {
  555. db.models.station.update({_id: stationId}, {$set: {privacy: newPrivacy}}, {runValidators: true}, next);
  556. },
  557. (res, next) => {
  558. stations.updateStation(stationId, next);
  559. }
  560. ], (err) => {
  561. if (err) {
  562. err = utils.getError(err);
  563. logger.error("STATIONS_UPDATE_PRIVACY", `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`);
  564. return cb({'status': 'failure', 'message': err});
  565. }
  566. logger.success("STATIONS_UPDATE_PRIVACY", `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`);
  567. return cb({'status': 'success', 'message': 'Successfully updated the privacy.'});
  568. });
  569. }),
  570. /**
  571. * Updates a station's party mode
  572. *
  573. * @param session
  574. * @param stationId - the station id
  575. * @param newPartyMode - the new station party mode
  576. * @param cb
  577. */
  578. updatePartyMode: hooks.ownerRequired((session, stationId, newPartyMode, cb) => {
  579. async.waterfall([
  580. (next) => {
  581. stations.getStation(stationId, next);
  582. },
  583. (station, next) => {
  584. if (!station) return next('Station not found.');
  585. if (station.partyMode === newPartyMode) return next('The party mode was already ' + ((newPartyMode) ? 'enabled.' : 'disabled.'));
  586. db.models.station.update({_id: stationId}, {$set: {partyMode: newPartyMode}}, {runValidators: true}, next);
  587. },
  588. (res, next) => {
  589. stations.updateStation(stationId, next);
  590. }
  591. ], (err) => {
  592. if (err) {
  593. err = utils.getError(err);
  594. logger.error("STATIONS_UPDATE_PARTY_MODE", `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`);
  595. return cb({'status': 'failure', 'message': err});
  596. }
  597. logger.success("STATIONS_UPDATE_PARTY_MODE", `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`);
  598. cache.pub('station.updatePartyMode', {stationId: stationId, partyMode: newPartyMode});
  599. stations.skipStation(stationId)();
  600. return cb({'status': 'success', 'message': 'Successfully updated the party mode.'});
  601. });
  602. }),
  603. /**
  604. * Pauses a station
  605. *
  606. * @param session
  607. * @param stationId - the station id
  608. * @param cb
  609. */
  610. pause: hooks.ownerRequired((session, stationId, cb) => {
  611. async.waterfall([
  612. (next) => {
  613. stations.getStation(stationId, next);
  614. },
  615. (station, next) => {
  616. if (!station) return next('Station not found.');
  617. if (station.paused) return next('That station was already paused.');
  618. db.models.station.update({_id: stationId}, {$set: {paused: true, pausedAt: Date.now()}}, next);
  619. },
  620. (res, next) => {
  621. stations.updateStation(stationId, next);
  622. }
  623. ], (err) => {
  624. if (err) {
  625. err = utils.getError(err);
  626. logger.error("STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  627. return cb({'status': 'failure', 'message': err});
  628. }
  629. logger.success("STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  630. cache.pub('station.pause', stationId);
  631. notifications.unschedule(`stations.nextSong?id=${stationId}`);
  632. return cb({'status': 'success', 'message': 'Successfully paused.'});
  633. });
  634. }),
  635. /**
  636. * Resumes a station
  637. *
  638. * @param session
  639. * @param stationId - the station id
  640. * @param cb
  641. */
  642. resume: hooks.ownerRequired((session, stationId, cb) => {
  643. async.waterfall([
  644. (next) => {
  645. stations.getStation(stationId, next);
  646. },
  647. (station, next) => {
  648. if (!station) return next('Station not found.');
  649. if (!station.paused) return next('That station is not paused.');
  650. station.timePaused += (Date.now() - station.pausedAt);
  651. db.models.station.update({_id: stationId}, {$set: {paused: false}, $inc: {timePaused: Date.now() - station.pausedAt}}, next);
  652. },
  653. (res, next) => {
  654. stations.updateStation(stationId, next);
  655. }
  656. ], (err) => {
  657. if (err) {
  658. err = utils.getError(err);
  659. logger.error("STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  660. return cb({'status': 'failure', 'message': err});
  661. }
  662. logger.success("STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  663. cache.pub('station.resume', stationId);
  664. return cb({'status': 'success', 'message': 'Successfully resumed.'});
  665. });
  666. }),
  667. /**
  668. * Removes a station
  669. *
  670. * @param session
  671. * @param stationId - the station id
  672. * @param cb
  673. */
  674. remove: hooks.ownerRequired((session, stationId, cb) => {
  675. async.waterfall([
  676. (next) => {
  677. db.models.station.remove({ _id: stationId }, err => next(err));
  678. },
  679. (next) => {
  680. cache.hdel('stations', stationId, err => next(err));
  681. }
  682. ], (err) => {
  683. if (err) {
  684. err = utils.getError(err);
  685. logger.error("STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  686. return cb({ 'status': 'failure', 'message': err });
  687. }
  688. logger.success("STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  689. cache.pub('station.remove', stationId);
  690. return cb({ 'status': 'success', 'message': 'Successfully removed.' });
  691. });
  692. }),
  693. /**
  694. * Create a station
  695. *
  696. * @param session
  697. * @param data - the station data
  698. * @param cb
  699. * @param userId
  700. */
  701. create: hooks.loginRequired((session, data, cb, userId) => {
  702. data.name = data.name.toLowerCase();
  703. let blacklist = ["country", "edm", "musare", "hip-hop", "rap", "top-hits", "todays-hits", "old-school", "christmas", "about", "support", "staff", "help", "news", "terms", "privacy", "profile", "c", "community", "tos", "login", "register", "p", "official", "o", "trap", "faq", "team", "donate", "buy", "shop", "forums", "explore", "settings", "admin", "auth", "reset_password"];
  704. async.waterfall([
  705. (next) => {
  706. if (!data) return next('Invalid data.');
  707. next();
  708. },
  709. (next) => {
  710. db.models.station.findOne({ $or: [{name: data.name}, {displayName: new RegExp(`^${data.displayName}$`, 'i')}] }, next);
  711. },
  712. (station, next) => {
  713. if (station) return next('A station with that name or display name already exists.');
  714. const { name, displayName, description, genres, playlist, type, blacklistedGenres } = data;
  715. if (type === 'official') {
  716. db.models.user.findOne({_id: userId}, (err, user) => {
  717. if (err) return next(err);
  718. if (!user) return next('User not found.');
  719. if (user.role !== 'admin') return next('Admin required.');
  720. db.models.station.create({
  721. name,
  722. displayName,
  723. description,
  724. type,
  725. privacy: 'private',
  726. playlist,
  727. genres,
  728. blacklistedGenres,
  729. currentSong: stations.defaultSong
  730. }, next);
  731. });
  732. } else if (type === 'community') {
  733. if (blacklist.indexOf(name) !== -1) return next('That name is blacklisted. Please use a different name.');
  734. db.models.station.create({
  735. name,
  736. displayName,
  737. description,
  738. type,
  739. privacy: 'private',
  740. owner: userId,
  741. queue: [],
  742. currentSong: null
  743. }, next);
  744. }
  745. }
  746. ], (err, station) => {
  747. if (err) {
  748. err = utils.getError(err);
  749. logger.error("STATIONS_CREATE", `Creating station failed. "${err}"`);
  750. return cb({'status': 'failure', 'message': err});
  751. }
  752. logger.success("STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  753. cache.pub('station.create', station._id);
  754. return cb({'status': 'success', 'message': 'Successfully created station.'});
  755. });
  756. }),
  757. /**
  758. * Adds song to station queue
  759. *
  760. * @param session
  761. * @param stationId - the station id
  762. * @param songId - the song id
  763. * @param cb
  764. * @param userId
  765. */
  766. addToQueue: hooks.loginRequired((session, stationId, songId, cb, userId) => {
  767. async.waterfall([
  768. (next) => {
  769. stations.getStation(stationId, next);
  770. },
  771. (station, next) => {
  772. if (!station) return next('Station not found.');
  773. if (station.type !== 'community') return next('That station is not a community station.');
  774. utils.canUserBeInStation(station, userId, (canBe) => {
  775. if (canBe) return next(null, station);
  776. return next('Insufficient permissions.');
  777. });
  778. },
  779. (station, next) => {
  780. if (station.currentSong && station.currentSong.songId === songId) return next('That song is currently playing.');
  781. async.each(station.queue, (queueSong, next) => {
  782. if (queueSong.songId === songId) return next('That song is already in the queue.');
  783. next();
  784. }, (err) => {
  785. next(err, station);
  786. });
  787. },
  788. (station, next) => {
  789. songs.getSong(songId, (err, song) => {
  790. if (!err && song) return next(null, song, station);
  791. utils.getSongFromYouTube(songId, (song) => {
  792. song.artists = [];
  793. song.skipDuration = 0;
  794. song.likes = -1;
  795. song.dislikes = -1;
  796. song.thumbnail = "empty";
  797. song.explicit = false;
  798. next(null, song, station);
  799. });
  800. });
  801. },
  802. (song, station, next) => {
  803. let queue = station.queue;
  804. song.requestedBy = userId;
  805. queue.push(song);
  806. let totalDuration = 0;
  807. queue.forEach((song) => {
  808. totalDuration += song.duration;
  809. });
  810. if (totalDuration >= 3600 * 3) return next('The max length of the queue is 3 hours.');
  811. next(null, song, station);
  812. },
  813. (song, station, next) => {
  814. let queue = station.queue;
  815. if (queue.length === 0) return next(null, song, station);
  816. let totalDuration = 0;
  817. const userId = queue[queue.length - 1].requestedBy;
  818. station.queue.forEach((song) => {
  819. if (userId === song.requestedBy) {
  820. totalDuration += song.duration;
  821. }
  822. });
  823. if(totalDuration >= 900) return next('The max length of songs per user is 15 minutes.');
  824. next(null, song, station);
  825. },
  826. (song, station, next) => {
  827. let queue = station.queue;
  828. if (queue.length === 0) return next(null, song);
  829. let totalSongs = 0;
  830. const userId = queue[queue.length - 1].requestedBy;
  831. queue.forEach((song) => {
  832. if (userId === song.requestedBy) {
  833. totalSongs++;
  834. }
  835. });
  836. if (totalSongs <= 2) return next(null, song);
  837. if (totalSongs > 3) return next('The max amount of songs per user is 3, and only 2 in a row is allowed.');
  838. if (queue[queue.length - 2].requestedBy !== userId || queue[queue.length - 3] !== userId) return next('The max amount of songs per user is 3, and only 2 in a row is allowed.');
  839. next(null, song);
  840. },
  841. (song, next) => {
  842. db.models.station.update({_id: stationId}, {$push: {queue: song}}, {runValidators: true}, next);
  843. },
  844. (res, next) => {
  845. stations.updateStation(stationId, next);
  846. }
  847. ], (err, station) => {
  848. if (err) {
  849. err = utils.getError(err);
  850. logger.error("STATIONS_ADD_SONG_TO_QUEUE", `Adding song "${songId}" to station "${stationId}" queue failed. "${err}"`);
  851. return cb({'status': 'failure', 'message': err});
  852. }
  853. logger.success("STATIONS_ADD_SONG_TO_QUEUE", `Added song "${songId}" to station "${stationId}" successfully.`);
  854. cache.pub('station.queueUpdate', stationId);
  855. return cb({'status': 'success', 'message': 'Successfully added song to queue.'});
  856. });
  857. }),
  858. /**
  859. * Removes song from station queue
  860. *
  861. * @param session
  862. * @param stationId - the station id
  863. * @param songId - the song id
  864. * @param cb
  865. * @param userId
  866. */
  867. removeFromQueue: hooks.ownerRequired((session, stationId, songId, cb, userId) => {
  868. async.waterfall([
  869. (next) => {
  870. if (!songId) return next('Invalid song id.');
  871. stations.getStation(stationId, next);
  872. },
  873. (station, next) => {
  874. if (!station) return next('Station not found.');
  875. if (station.type !== 'community') return next('Station is not a community station.');
  876. async.each(station.queue, (queueSong, next) => {
  877. if (queueSong.songId === songId) return next(true);
  878. next();
  879. }, (err) => {
  880. if (err === true) return next();
  881. next('Song is not currently in the queue.');
  882. });
  883. },
  884. (next) => {
  885. db.models.update({_id: stationId}, {$pull: {queue: {songId: songId}}}, next);
  886. },
  887. (next) => {
  888. stations.updateStation(stationId, next);
  889. }
  890. ], (err, station) => {
  891. if (err) {
  892. err = utils.getError(err);
  893. logger.error("STATIONS_REMOVE_SONG_TO_QUEUE", `Removing song "${songId}" from station "${stationId}" queue failed. "${err}"`);
  894. return cb({'status': 'failure', 'message': err});
  895. }
  896. logger.success("STATIONS_REMOVE_SONG_TO_QUEUE", `Removed song "${songId}" from station "${stationId}" successfully.`);
  897. cache.pub('station.queueUpdate', stationId);
  898. return cb({'status': 'success', 'message': 'Successfully removed song from queue.'});
  899. });
  900. }),
  901. /**
  902. * Gets the queue from a station
  903. *
  904. * @param session
  905. * @param stationId - the station id
  906. * @param cb
  907. */
  908. getQueue: (session, stationId, cb) => {
  909. async.waterfall([
  910. (next) => {
  911. stations.getStation(stationId, next);
  912. },
  913. (station, next) => {
  914. if (!station) return next('Station not found.');
  915. if (station.type !== 'community') return next('Station is not a community station.');
  916. next(null, station);
  917. },
  918. (station, next) => {
  919. utils.canUserBeInStation(station, session.userId, (canBe) => {
  920. if (canBe) return next(null, station);
  921. return next('Insufficient permissions.');
  922. });
  923. }
  924. ], (err, station) => {
  925. if (err) {
  926. err = utils.getError(err);
  927. logger.error("STATIONS_GET_QUEUE", `Getting queue for station "${stationId}" failed. "${err}"`);
  928. return cb({'status': 'failure', 'message': err});
  929. }
  930. logger.success("STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  931. return cb({'status': 'success', 'message': 'Successfully got queue.', queue: station.queue});
  932. });
  933. },
  934. /**
  935. * Selects a private playlist for a station
  936. *
  937. * @param session
  938. * @param stationId - the station id
  939. * @param playlistId - the private playlist id
  940. * @param cb
  941. * @param userId
  942. */
  943. selectPrivatePlaylist: hooks.ownerRequired((session, stationId, playlistId, cb, userId) => {
  944. async.waterfall([
  945. (next) => {
  946. stations.getStation(stationId, next);
  947. },
  948. (station, next) => {
  949. if (!station) return next('Station not found.');
  950. if (station.type !== 'community') return next('Station is not a community station.');
  951. if (station.privatePlaylist === playlistId) return next('That private playlist is already selected.');
  952. db.models.playlist.findOne({_id: playlistId}, next);
  953. },
  954. (playlist, next) => {
  955. if (!playlist) return next('Playlist not found.');
  956. let currentSongIndex = (playlist.songs.length > 0) ? playlist.songs.length - 1 : 0;
  957. db.models.station.update({_id: stationId}, {$set: {privatePlaylist: playlistId, currentSongIndex: currentSongIndex}}, {runValidators: true}, next);
  958. },
  959. (res, next) => {
  960. stations.updateStation(stationId, next);
  961. }
  962. ], (err, station) => {
  963. if (err) {
  964. err = utils.getError(err);
  965. logger.error("STATIONS_SELECT_PRIVATE_PLAYLIST", `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`);
  966. return cb({'status': 'failure', 'message': err});
  967. }
  968. logger.success("STATIONS_SELECT_PRIVATE_PLAYLIST", `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`);
  969. notifications.unschedule(`stations.nextSong?id${stationId}`);
  970. if (!station.partyMode) stations.skipStation(stationId)();
  971. cache.pub('privatePlaylist.selected', {playlistId, stationId});
  972. return cb({'status': 'success', 'message': 'Successfully selected playlist.'});
  973. });
  974. }),
  975. };