stations.js 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160
  1. import async from "async";
  2. import underscore from "underscore";
  3. import { isLoginRequired, isOwnerRequired } from "./hooks";
  4. import db from "../db";
  5. import utils from "../utils";
  6. import songs from "../songs";
  7. import cache from "../cache";
  8. import notifications from "../notifications";
  9. import stations from "../stations";
  10. import activities from "../activities";
  11. const { _ } = underscore;
  12. // const logger = moduleManager.modules["logger"];
  13. const userList = {};
  14. let usersPerStation = {};
  15. let usersPerStationCount = {};
  16. // Temporarily disabled until the messages in console can be limited
  17. setInterval(async () => {
  18. const stationsCountUpdated = [];
  19. const stationsUpdated = [];
  20. const oldUsersPerStation = usersPerStation;
  21. usersPerStation = {};
  22. const oldUsersPerStationCount = usersPerStationCount;
  23. usersPerStationCount = {};
  24. const userModel = await db.runJob("GET_MODEL", {
  25. modelName: "user"
  26. });
  27. async.each(
  28. Object.keys(userList),
  29. (socketId, next) => {
  30. utils.runJob("SOCKET_FROM_SESSION", { socketId }, { isQuiet: true }).then(socket => {
  31. const stationId = userList[socketId];
  32. if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
  33. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  34. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  35. delete userList[socketId];
  36. return next();
  37. }
  38. if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
  39. usersPerStationCount[stationId] += 1;
  40. if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
  41. return async.waterfall(
  42. [
  43. next => {
  44. if (!socket.session || !socket.session.sessionId) return next("No session found.");
  45. return cache
  46. .runJob("HGET", {
  47. table: "sessions",
  48. key: socket.session.sessionId
  49. })
  50. .then(session => {
  51. next(null, session);
  52. })
  53. .catch(next);
  54. },
  55. (session, next) => {
  56. if (!session) return next("Session not found.");
  57. return userModel.findOne({ _id: session.userId }, next);
  58. },
  59. (user, next) => {
  60. if (!user) return next("User not found.");
  61. if (usersPerStation[stationId].indexOf(user.username) !== -1)
  62. return next("User already in the list.");
  63. return next(null, user.username);
  64. }
  65. ],
  66. (err, username) => {
  67. if (!err) {
  68. usersPerStation[stationId].push(username);
  69. }
  70. next();
  71. }
  72. );
  73. });
  74. // TODO Code to show users
  75. },
  76. () => {
  77. for (
  78. let stationId = 0, stationKeys = Object.keys(usersPerStationCount);
  79. stationId < stationKeys.length;
  80. stationId += 1
  81. ) {
  82. if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
  83. if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  84. }
  85. }
  86. for (
  87. let stationId = 0, stationKeys = Object.keys(usersPerStation);
  88. stationId < stationKeys.length;
  89. stationId += 1
  90. ) {
  91. if (
  92. _.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 ||
  93. _.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0
  94. ) {
  95. if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  96. }
  97. }
  98. stationsCountUpdated.forEach(stationId => {
  99. console.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  100. cache.runJob("PUB", {
  101. table: "station.updateUserCount",
  102. value: stationId
  103. });
  104. });
  105. stationsUpdated.forEach(stationId => {
  106. console.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  107. cache.runJob("PUB", {
  108. table: "station.updateUsers",
  109. value: stationId
  110. });
  111. });
  112. // console.log("Userlist", usersPerStation);
  113. }
  114. );
  115. }, 3000);
  116. cache.runJob("SUB", {
  117. channel: "station.updateUsers",
  118. cb: stationId => {
  119. const list = usersPerStation[stationId] || [];
  120. utils.runJob("EMIT_TO_ROOM", {
  121. room: `station.${stationId}`,
  122. args: ["event:users.updated", list]
  123. });
  124. }
  125. });
  126. cache.runJob("SUB", {
  127. channel: "station.updateUserCount",
  128. cb: stationId => {
  129. const count = usersPerStationCount[stationId] || 0;
  130. utils.runJob("EMIT_TO_ROOM", {
  131. room: `station.${stationId}`,
  132. args: ["event:userCount.updated", count]
  133. });
  134. stations.runJob("GET_STATION", { stationId }).then(async station => {
  135. if (station.privacy === "public")
  136. utils.runJob("EMIT_TO_ROOM", {
  137. room: "home",
  138. args: ["event:userCount.updated", stationId, count]
  139. });
  140. else {
  141. const sockets = await utils.runJob("GET_ROOM_SOCKETS", {
  142. room: "home"
  143. });
  144. for (let socketId = 0, socketKeys = Object.keys(sockets); socketId < socketKeys.length; socketId += 1) {
  145. const socket = sockets[socketKeys[socketId]];
  146. const { session } = sockets[socketKeys[socketId]];
  147. if (session.sessionId) {
  148. cache
  149. .runJob("HGET", {
  150. table: "sessions",
  151. key: session.sessionId
  152. })
  153. .then(session => {
  154. if (session)
  155. db.runJob("GET_MODEL", {
  156. modelName: "user"
  157. }).then(userModel =>
  158. userModel.findOne({ _id: session.userId }, (err, user) => {
  159. if (user.role === "admin")
  160. socket.emit("event:userCount.updated", stationId, count);
  161. else if (station.type === "community" && station.owner === session.userId)
  162. socket.emit("event:userCount.updated", stationId, count);
  163. })
  164. );
  165. });
  166. }
  167. }
  168. }
  169. });
  170. }
  171. });
  172. cache.runJob("SUB", {
  173. channel: "station.queueLockToggled",
  174. cb: data => {
  175. utils.runJob("EMIT_TO_ROOM", {
  176. room: `station.${data.stationId}`,
  177. args: ["event:queueLockToggled", data.locked]
  178. });
  179. }
  180. });
  181. cache.runJob("SUB", {
  182. channel: "station.updatePartyMode",
  183. cb: data => {
  184. utils.runJob("EMIT_TO_ROOM", {
  185. room: `station.${data.stationId}`,
  186. args: ["event:partyMode.updated", data.partyMode]
  187. });
  188. }
  189. });
  190. cache.runJob("SUB", {
  191. channel: "privatePlaylist.selected",
  192. cb: data => {
  193. utils.runJob("EMIT_TO_ROOM", {
  194. room: `station.${data.stationId}`,
  195. args: ["event:privatePlaylist.selected", data.playlistId]
  196. });
  197. }
  198. });
  199. cache.runJob("SUB", {
  200. channel: "station.pause",
  201. cb: stationId => {
  202. stations.runJob("GET_STATION", { stationId }).then(station => {
  203. utils.runJob("EMIT_TO_ROOM", {
  204. room: `station.${stationId}`,
  205. args: ["event:stations.pause", { pausedAt: station.pausedAt }]
  206. });
  207. });
  208. }
  209. });
  210. cache.runJob("SUB", {
  211. channel: "station.resume",
  212. cb: stationId => {
  213. stations.runJob("GET_STATION", { stationId }).then(station => {
  214. utils.runJob("EMIT_TO_ROOM", {
  215. room: `station.${stationId}`,
  216. args: ["event:stations.resume", { timePaused: station.timePaused }]
  217. });
  218. });
  219. }
  220. });
  221. cache.runJob("SUB", {
  222. channel: "station.queueUpdate",
  223. cb: stationId => {
  224. stations.runJob("GET_STATION", { stationId }).then(station => {
  225. utils.runJob("EMIT_TO_ROOM", {
  226. room: `station.${stationId}`,
  227. args: ["event:queue.update", station.queue]
  228. });
  229. });
  230. }
  231. });
  232. cache.runJob("SUB", {
  233. channel: "station.voteSkipSong",
  234. cb: stationId => {
  235. utils.runJob("EMIT_TO_ROOM", {
  236. room: `station.${stationId}`,
  237. args: ["event:song.voteSkipSong"]
  238. });
  239. }
  240. });
  241. cache.runJob("SUB", {
  242. channel: "station.remove",
  243. cb: stationId => {
  244. utils.runJob("EMIT_TO_ROOM", {
  245. room: `station.${stationId}`,
  246. args: ["event:stations.remove"]
  247. });
  248. utils.runJob("EMIT_TO_ROOM", {
  249. room: "admin.stations",
  250. args: ["event:admin.station.removed", stationId]
  251. });
  252. }
  253. });
  254. cache.runJob("SUB", {
  255. channel: "station.create",
  256. cb: async stationId => {
  257. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  258. stations.runJob("INITIALIZE_STATION", { stationId }).then(async response => {
  259. const { station } = response;
  260. station.userCount = usersPerStationCount[stationId] || 0;
  261. utils.runJob("EMIT_TO_ROOM", {
  262. room: "admin.stations",
  263. args: ["event:admin.station.added", station]
  264. });
  265. // TODO If community, check if on whitelist
  266. if (station.privacy === "public")
  267. utils.runJob("EMIT_TO_ROOM", {
  268. room: "home",
  269. args: ["event:stations.created", station]
  270. });
  271. else {
  272. const sockets = await utils.runJob("GET_ROOM_SOCKETS", {
  273. room: "home"
  274. });
  275. for (let socketId = 0, socketKeys = Object.keys(sockets); socketId < socketKeys.length; socketId += 1) {
  276. const socket = sockets[socketKeys[socketId]];
  277. const { session } = sockets[socketKeys[socketId]];
  278. if (session.sessionId) {
  279. cache
  280. .runJob("HGET", {
  281. table: "sessions",
  282. key: session.sessionId
  283. })
  284. .then(session => {
  285. if (session) {
  286. userModel.findOne({ _id: session.userId }, (err, user) => {
  287. if (user.role === "admin") socket.emit("event:stations.created", station);
  288. else if (station.type === "community" && station.owner === session.userId)
  289. socket.emit("event:stations.created", station);
  290. });
  291. }
  292. });
  293. }
  294. }
  295. }
  296. });
  297. }
  298. });
  299. export default {
  300. /**
  301. * Get a list of all the stations
  302. *
  303. * @param {object} session - user session
  304. * @param {Function} cb - callback
  305. */
  306. index: (session, cb) => {
  307. async.waterfall(
  308. [
  309. next => {
  310. cache.runJob("HGETALL", { table: "stations" }).then(stations => {
  311. next(null, stations);
  312. });
  313. },
  314. (items, next) => {
  315. const filteredStations = [];
  316. async.each(
  317. items,
  318. (station, next) => {
  319. async.waterfall(
  320. [
  321. next => {
  322. stations
  323. .runJob("CAN_USER_VIEW_STATION", {
  324. station,
  325. userId: session.userId,
  326. hideUnlisted: true
  327. })
  328. .then(exists => {
  329. next(null, exists);
  330. })
  331. .catch(next);
  332. }
  333. ],
  334. (err, exists) => {
  335. if (err) console.log(err);
  336. station.userCount = usersPerStationCount[station._id] || 0;
  337. if (exists) filteredStations.push(station);
  338. next();
  339. }
  340. );
  341. },
  342. () => next(null, filteredStations)
  343. );
  344. }
  345. ],
  346. async (err, stations) => {
  347. if (err) {
  348. err = await utils.runJob("GET_ERROR", { error: err });
  349. console.log("ERROR", "STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  350. return cb({ status: "failure", message: err });
  351. }
  352. console.log("SUCCESS", "STATIONS_INDEX", `Indexing stations successful.`, false);
  353. return cb({ status: "success", stations });
  354. }
  355. );
  356. },
  357. /**
  358. * Obtains basic metadata of a station in order to format an activity
  359. *
  360. * @param {object} session - user session
  361. * @param {string} stationId - the station id
  362. * @param {Function} cb - callback
  363. */
  364. getStationForActivity: (session, stationId, cb) => {
  365. async.waterfall(
  366. [
  367. next => {
  368. stations
  369. .runJob("GET_STATION", { stationId })
  370. .then(station => {
  371. next(null, station);
  372. })
  373. .catch(next);
  374. }
  375. ],
  376. async (err, station) => {
  377. if (err) {
  378. err = await utils.runJob("GET_ERROR", { error: err });
  379. console.log(
  380. "ERROR",
  381. "STATIONS_GET_STATION_FOR_ACTIVITY",
  382. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  383. );
  384. return cb({ status: "failure", message: err });
  385. }
  386. console.log(
  387. "SUCCESS",
  388. "STATIONS_GET_STATION_FOR_ACTIVITY",
  389. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  390. );
  391. return cb({
  392. status: "success",
  393. data: {
  394. title: station.displayName,
  395. thumbnail: station.currentSong ? station.currentSong.thumbnail : ""
  396. }
  397. });
  398. }
  399. );
  400. },
  401. /**
  402. * Verifies that a station exists
  403. *
  404. * @param {object} session - user session
  405. * @param {string} stationName - the station name
  406. * @param {Function} cb - callback
  407. */
  408. existsByName: (session, stationName, cb) => {
  409. async.waterfall(
  410. [
  411. next => {
  412. stations
  413. .runJob("GET_STATION_BY_NAME", { stationName })
  414. .then(station => {
  415. next(null, station);
  416. })
  417. .catch(next);
  418. },
  419. (station, next) => {
  420. if (!station) return next(null, false);
  421. return stations
  422. .runJob("CAN_USER_VIEW_STATION", {
  423. station,
  424. userId: session.userId
  425. })
  426. .then(exists => {
  427. next(null, exists);
  428. })
  429. .catch(next);
  430. }
  431. ],
  432. async (err, exists) => {
  433. if (err) {
  434. err = await utils.runJob("GET_ERROR", { error: err });
  435. console.log(
  436. "ERROR",
  437. "STATION_EXISTS_BY_NAME",
  438. `Checking if station "${stationName}" exists failed. "${err}"`
  439. );
  440. return cb({ status: "failure", message: err });
  441. }
  442. console.log(
  443. "SUCCESS",
  444. "STATION_EXISTS_BY_NAME",
  445. `Station "${stationName}" exists successfully.` /* , false */
  446. );
  447. return cb({ status: "success", exists });
  448. }
  449. );
  450. },
  451. /**
  452. * Gets the official playlist for a station
  453. *
  454. * @param {object} session - user session
  455. * @param {string} stationId - the station id
  456. * @param {Function} cb - callback
  457. */
  458. getPlaylist: (session, stationId, cb) => {
  459. async.waterfall(
  460. [
  461. next => {
  462. stations
  463. .runJob("GET_STATION", { stationId })
  464. .then(station => {
  465. next(null, station);
  466. })
  467. .catch(next);
  468. },
  469. (station, next) => {
  470. stations
  471. .runJob("CAN_USER_VIEW_STATION", {
  472. station,
  473. userId: session.userId
  474. })
  475. .then(canView => {
  476. if (canView) return next(null, station);
  477. return next("Insufficient permissions.");
  478. })
  479. .catch(err => next(err));
  480. },
  481. (station, next) => {
  482. if (!station) return next("Station not found.");
  483. if (station.type !== "official") return next("This is not an official station.");
  484. return next();
  485. },
  486. next => {
  487. cache
  488. .runJob("HGET", {
  489. table: "officialPlaylists",
  490. key: stationId
  491. })
  492. .then(playlist => {
  493. next(null, playlist);
  494. })
  495. .catch(next);
  496. },
  497. (playlist, next) => {
  498. if (!playlist) return next("Playlist not found.");
  499. return next(null, playlist);
  500. }
  501. ],
  502. async (err, playlist) => {
  503. if (err) {
  504. err = await utils.runJob("GET_ERROR", { error: err });
  505. console.log(
  506. "ERROR",
  507. "STATIONS_GET_PLAYLIST",
  508. `Getting playlist for station "${stationId}" failed. "${err}"`
  509. );
  510. return cb({ status: "failure", message: err });
  511. }
  512. console.log(
  513. "SUCCESS",
  514. "STATIONS_GET_PLAYLIST",
  515. `Got playlist for station "${stationId}" successfully.`,
  516. false
  517. );
  518. return cb({ status: "success", data: playlist.songs });
  519. }
  520. );
  521. },
  522. /**
  523. * Joins the station by its name
  524. *
  525. * @param {object} session - user session
  526. * @param {string} stationName - the station name
  527. * @param {Function} cb - callback
  528. */
  529. join: (session, stationName, cb) => {
  530. async.waterfall(
  531. [
  532. next => {
  533. stations
  534. .runJob("GET_STATION_BY_NAME", { stationName })
  535. .then(station => {
  536. next(null, station);
  537. })
  538. .catch(next);
  539. },
  540. (station, next) => {
  541. if (!station) return next("Station not found.");
  542. return stations
  543. .runJob("CAN_USER_VIEW_STATION", {
  544. station,
  545. userId: session.userId
  546. })
  547. .then(canView => {
  548. if (!canView) next("Not allowed to join station.");
  549. else next(null, station);
  550. })
  551. .catch(err => next(err));
  552. },
  553. (station, next) => {
  554. utils.runJob("SOCKET_JOIN_ROOM", {
  555. socketId: session.socketId,
  556. room: `station.${station._id}`
  557. });
  558. const data = {
  559. _id: station._id,
  560. type: station.type,
  561. currentSong: station.currentSong,
  562. startedAt: station.startedAt,
  563. paused: station.paused,
  564. timePaused: station.timePaused,
  565. pausedAt: station.pausedAt,
  566. description: station.description,
  567. displayName: station.displayName,
  568. privacy: station.privacy,
  569. locked: station.locked,
  570. partyMode: station.partyMode,
  571. owner: station.owner,
  572. privatePlaylist: station.privatePlaylist
  573. };
  574. userList[session.socketId] = station._id;
  575. next(null, data);
  576. },
  577. (data, next) => {
  578. data = JSON.parse(JSON.stringify(data));
  579. data.userCount = usersPerStationCount[data._id] || 0;
  580. data.users = usersPerStation[data._id] || [];
  581. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  582. utils.runJob("SOCKET_JOIN_SONG_ROOM", {
  583. socketId: session.socketId,
  584. room: `song.${data.currentSong.songId}`
  585. });
  586. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  587. return songs
  588. .runJob("GET_SONG_FROM_ID", {
  589. songId: data.currentSong.songId
  590. })
  591. .then(response => {
  592. const { song } = response;
  593. if (song) {
  594. data.currentSong.likes = song.likes;
  595. data.currentSong.dislikes = song.dislikes;
  596. } else {
  597. data.currentSong.likes = -1;
  598. data.currentSong.dislikes = -1;
  599. }
  600. })
  601. .catch(() => {
  602. data.currentSong.likes = -1;
  603. data.currentSong.dislikes = -1;
  604. })
  605. .finally(() => {
  606. next(null, data);
  607. });
  608. }
  609. ],
  610. async (err, data) => {
  611. if (err) {
  612. err = await utils.runJob("GET_ERROR", { error: err });
  613. console.log("ERROR", "STATIONS_JOIN", `Joining station "${stationName}" failed. "${err}"`);
  614. return cb({ status: "failure", message: err });
  615. }
  616. console.log("SUCCESS", "STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  617. return cb({ status: "success", data });
  618. }
  619. );
  620. },
  621. /**
  622. * Toggles if a station is locked
  623. *
  624. * @param session
  625. * @param stationId - the station id
  626. * @param cb
  627. */
  628. toggleLock: isOwnerRequired(async (session, stationId, cb) => {
  629. const stationModel = await db.runJob("GET_MODEL", {
  630. modelName: "station"
  631. });
  632. async.waterfall(
  633. [
  634. next => {
  635. stations
  636. .runJob("GET_STATION", { stationId })
  637. .then(station => {
  638. next(null, station);
  639. })
  640. .catch(next);
  641. },
  642. (station, next) => {
  643. stationModel.updateOne({ _id: stationId }, { $set: { locked: !station.locked } }, next);
  644. },
  645. (res, next) => {
  646. stations
  647. .runJob("UPDATE_STATION", { stationId })
  648. .then(station => {
  649. next(null, station);
  650. })
  651. .catch(next);
  652. }
  653. ],
  654. async (err, station) => {
  655. if (err) {
  656. err = await utils.runJob("GET_ERROR", { error: err });
  657. console.log(
  658. "ERROR",
  659. "STATIONS_UPDATE_LOCKED_STATUS",
  660. `Toggling the queue lock for station "${stationId}" failed. "${err}"`
  661. );
  662. return cb({ status: "failure", message: err });
  663. }
  664. console.log(
  665. "SUCCESS",
  666. "STATIONS_UPDATE_LOCKED_STATUS",
  667. `Toggled the queue lock for station "${stationId}" successfully to "${station.locked}".`
  668. );
  669. cache.runJob("PUB", {
  670. channel: "station.queueLockToggled",
  671. value: {
  672. stationId,
  673. locked: station.locked
  674. }
  675. });
  676. return cb({ status: "success", data: station.locked });
  677. }
  678. );
  679. }),
  680. /**
  681. * Votes to skip a station
  682. *
  683. * @param session
  684. * @param stationId - the station id
  685. * @param cb
  686. */
  687. voteSkip: isLoginRequired(async (session, stationId, cb) => {
  688. const stationModel = await db.runJob("GET_MODEL", {
  689. modelName: "station"
  690. });
  691. let skipVotes = 0;
  692. let shouldSkip = false;
  693. async.waterfall(
  694. [
  695. next => {
  696. stations
  697. .runJob("GET_STATION", { stationId })
  698. .then(station => {
  699. next(null, station);
  700. })
  701. .catch(next);
  702. },
  703. (station, next) => {
  704. if (!station) return next("Station not found.");
  705. return stations
  706. .runJob("CAN_USER_VIEW_STATION", {
  707. station,
  708. userId: session.userId
  709. })
  710. .then(canView => {
  711. if (canView) return next(null, station);
  712. return next("Insufficient permissions.");
  713. })
  714. .catch(err => next(err));
  715. },
  716. (station, next) => {
  717. if (!station.currentSong) return next("There is currently no song to skip.");
  718. if (station.currentSong.skipVotes.indexOf(session.userId) !== -1)
  719. return next("You have already voted to skip this song.");
  720. return next(null, station);
  721. },
  722. (station, next) => {
  723. stationModel.updateOne(
  724. { _id: stationId },
  725. { $push: { "currentSong.skipVotes": session.userId } },
  726. next
  727. );
  728. },
  729. (res, next) => {
  730. stations
  731. .runJob("UPDATE_STATION", { stationId })
  732. .then(station => {
  733. next(null, station);
  734. })
  735. .catch(next);
  736. },
  737. (station, next) => {
  738. if (!station) return next("Station not found.");
  739. return next(null, station);
  740. },
  741. (station, next) => {
  742. skipVotes = station.currentSong.skipVotes.length;
  743. utils
  744. .runJob("GET_ROOM_SOCKETS", {
  745. room: `station.${stationId}`
  746. })
  747. .then(sockets => {
  748. next(null, sockets);
  749. })
  750. .catch(next);
  751. },
  752. (sockets, next) => {
  753. if (sockets.length <= skipVotes) shouldSkip = true;
  754. next();
  755. }
  756. ],
  757. async err => {
  758. if (err) {
  759. err = await utils.runJob("GET_ERROR", { error: err });
  760. console.log("ERROR", "STATIONS_VOTE_SKIP", `Vote skipping station "${stationId}" failed. "${err}"`);
  761. return cb({ status: "failure", message: err });
  762. }
  763. console.log("SUCCESS", "STATIONS_VOTE_SKIP", `Vote skipping "${stationId}" successful.`);
  764. cache.runJob("PUB", {
  765. channel: "station.voteSkipSong",
  766. value: stationId
  767. });
  768. if (shouldSkip) stations.runJob("SKIP_STATION", { stationId });
  769. return cb({
  770. status: "success",
  771. message: "Successfully voted to skip the song."
  772. });
  773. }
  774. );
  775. }),
  776. /**
  777. * Force skips a station
  778. *
  779. * @param session
  780. * @param stationId - the station id
  781. * @param cb
  782. */
  783. forceSkip: isOwnerRequired((session, stationId, cb) => {
  784. async.waterfall(
  785. [
  786. next => {
  787. stations
  788. .runJob("GET_STATION", { stationId })
  789. .then(station => {
  790. next(null, station);
  791. })
  792. .catch(next);
  793. },
  794. (station, next) => {
  795. if (!station) return next("Station not found.");
  796. return next();
  797. }
  798. ],
  799. async err => {
  800. if (err) {
  801. err = await utils.runJob("GET_ERROR", { error: err });
  802. console.log(
  803. "ERROR",
  804. "STATIONS_FORCE_SKIP",
  805. `Force skipping station "${stationId}" failed. "${err}"`
  806. );
  807. return cb({ status: "failure", message: err });
  808. }
  809. notifications.runJob("UNSCHEDULE", {
  810. name: `stations.nextSong?id=${stationId}`
  811. });
  812. stations.runJob("SKIP_STATION", { stationId });
  813. console.log("SUCCESS", "STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  814. return cb({
  815. status: "success",
  816. message: "Successfully skipped station."
  817. });
  818. }
  819. );
  820. }),
  821. /**
  822. * Leaves the user's current station
  823. *
  824. * @param {object} session - user session
  825. * @param {string} stationId - id of station to leave
  826. * @param {Function} cb - callback
  827. */
  828. leave: (session, stationId, cb) => {
  829. async.waterfall(
  830. [
  831. next => {
  832. stations
  833. .runJob("GET_STATION", { stationId })
  834. .then(station => {
  835. next(null, station);
  836. })
  837. .catch(next);
  838. },
  839. (station, next) => {
  840. if (!station) return next("Station not found.");
  841. return next();
  842. }
  843. ],
  844. async (err, userCount) => {
  845. if (err) {
  846. err = await utils.runJob("GET_ERROR", { error: err });
  847. console.log("ERROR", "STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  848. return cb({ status: "failure", message: err });
  849. }
  850. console.log("SUCCESS", "STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  851. utils.runJob("SOCKET_LEAVE_ROOMS", { socketId: session });
  852. delete userList[session.socketId];
  853. return cb({
  854. status: "success",
  855. message: "Successfully left station.",
  856. userCount
  857. });
  858. }
  859. );
  860. },
  861. /**
  862. * Updates a station's name
  863. *
  864. * @param session
  865. * @param stationId - the station id
  866. * @param newName - the new station name
  867. * @param cb
  868. */
  869. updateName: isOwnerRequired(async (session, stationId, newName, cb) => {
  870. const stationModel = await db.runJob("GET_MODEL", {
  871. modelName: "station"
  872. });
  873. async.waterfall(
  874. [
  875. next => {
  876. stationModel.updateOne(
  877. { _id: stationId },
  878. { $set: { name: newName } },
  879. { runValidators: true },
  880. next
  881. );
  882. },
  883. (res, next) => {
  884. stations
  885. .runJob("UPDATE_STATION", { stationId })
  886. .then(station => {
  887. next(null, station);
  888. })
  889. .catch(next);
  890. }
  891. ],
  892. async err => {
  893. if (err) {
  894. err = await utils.runJob("GET_ERROR", { error: err });
  895. console.log(
  896. "ERROR",
  897. "STATIONS_UPDATE_NAME",
  898. `Updating station "${stationId}" name to "${newName}" failed. "${err}"`
  899. );
  900. return cb({ status: "failure", message: err });
  901. }
  902. console.log(
  903. "SUCCESS",
  904. "STATIONS_UPDATE_NAME",
  905. `Updated station "${stationId}" name to "${newName}" successfully.`
  906. );
  907. return cb({
  908. status: "success",
  909. message: "Successfully updated the name."
  910. });
  911. }
  912. );
  913. }),
  914. /**
  915. * Updates a station's display name
  916. *
  917. * @param session
  918. * @param stationId - the station id
  919. * @param newDisplayName - the new station display name
  920. * @param cb
  921. */
  922. updateDisplayName: isOwnerRequired(async (session, stationId, newDisplayName, cb) => {
  923. const stationModel = await db.runJob("GET_MODEL", {
  924. modelName: "station"
  925. });
  926. async.waterfall(
  927. [
  928. next => {
  929. stationModel.updateOne(
  930. { _id: stationId },
  931. { $set: { displayName: newDisplayName } },
  932. { runValidators: true },
  933. next
  934. );
  935. },
  936. (res, next) => {
  937. stations
  938. .runJob("UPDATE_STATION", { stationId })
  939. .then(station => {
  940. next(null, station);
  941. })
  942. .catch(next);
  943. }
  944. ],
  945. async err => {
  946. if (err) {
  947. err = await utils.runJob("GET_ERROR", { error: err });
  948. console.log(
  949. "ERROR",
  950. "STATIONS_UPDATE_DISPLAY_NAME",
  951. `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`
  952. );
  953. return cb({ status: "failure", message: err });
  954. }
  955. console.log(
  956. "SUCCESS",
  957. "STATIONS_UPDATE_DISPLAY_NAME",
  958. `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`
  959. );
  960. return cb({
  961. status: "success",
  962. message: "Successfully updated the display name."
  963. });
  964. }
  965. );
  966. }),
  967. /**
  968. * Updates a station's description
  969. *
  970. * @param session
  971. * @param stationId - the station id
  972. * @param newDescription - the new station description
  973. * @param cb
  974. */
  975. updateDescription: isOwnerRequired(async (session, stationId, newDescription, cb) => {
  976. const stationModel = await db.runJob("GET_MODEL", {
  977. modelName: "station"
  978. });
  979. async.waterfall(
  980. [
  981. next => {
  982. stationModel.updateOne(
  983. { _id: stationId },
  984. { $set: { description: newDescription } },
  985. { runValidators: true },
  986. next
  987. );
  988. },
  989. (res, next) => {
  990. stations
  991. .runJob("UPDATE_STATION", { stationId })
  992. .then(station => {
  993. next(null, station);
  994. })
  995. .catch(next);
  996. }
  997. ],
  998. async err => {
  999. if (err) {
  1000. err = await utils.runJob("GET_ERROR", { error: err });
  1001. console.log(
  1002. "ERROR",
  1003. "STATIONS_UPDATE_DESCRIPTION",
  1004. `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`
  1005. );
  1006. return cb({ status: "failure", message: err });
  1007. }
  1008. console.log(
  1009. "SUCCESS",
  1010. "STATIONS_UPDATE_DESCRIPTION",
  1011. `Updated station "${stationId}" description to "${newDescription}" successfully.`
  1012. );
  1013. return cb({
  1014. status: "success",
  1015. message: "Successfully updated the description."
  1016. });
  1017. }
  1018. );
  1019. }),
  1020. /**
  1021. * Updates a station's privacy
  1022. *
  1023. * @param session
  1024. * @param stationId - the station id
  1025. * @param newPrivacy - the new station privacy
  1026. * @param cb
  1027. */
  1028. updatePrivacy: isOwnerRequired(async (session, stationId, newPrivacy, cb) => {
  1029. const stationModel = await db.runJob("GET_MODEL", {
  1030. modelName: "station"
  1031. });
  1032. async.waterfall(
  1033. [
  1034. next => {
  1035. stationModel.updateOne(
  1036. { _id: stationId },
  1037. { $set: { privacy: newPrivacy } },
  1038. { runValidators: true },
  1039. next
  1040. );
  1041. },
  1042. (res, next) => {
  1043. stations
  1044. .runJob("UPDATE_STATION", { stationId })
  1045. .then(station => {
  1046. next(null, station);
  1047. })
  1048. .catch(next);
  1049. }
  1050. ],
  1051. async err => {
  1052. if (err) {
  1053. err = await utils.runJob("GET_ERROR", { error: err });
  1054. console.log(
  1055. "ERROR",
  1056. "STATIONS_UPDATE_PRIVACY",
  1057. `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`
  1058. );
  1059. return cb({ status: "failure", message: err });
  1060. }
  1061. console.log(
  1062. "SUCCESS",
  1063. "STATIONS_UPDATE_PRIVACY",
  1064. `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`
  1065. );
  1066. return cb({
  1067. status: "success",
  1068. message: "Successfully updated the privacy."
  1069. });
  1070. }
  1071. );
  1072. }),
  1073. /**
  1074. * Updates a station's genres
  1075. *
  1076. * @param session
  1077. * @param stationId - the station id
  1078. * @param newGenres - the new station genres
  1079. * @param cb
  1080. */
  1081. updateGenres: isOwnerRequired(async (session, stationId, newGenres, cb) => {
  1082. const stationModel = await db.runJob("GET_MODEL", {
  1083. modelName: "station"
  1084. });
  1085. async.waterfall(
  1086. [
  1087. next => {
  1088. stationModel.updateOne(
  1089. { _id: stationId },
  1090. { $set: { genres: newGenres } },
  1091. { runValidators: true },
  1092. next
  1093. );
  1094. },
  1095. (res, next) => {
  1096. stations
  1097. .runJob("UPDATE_STATION", { stationId })
  1098. .then(station => {
  1099. next(null, station);
  1100. })
  1101. .catch(next);
  1102. }
  1103. ],
  1104. async err => {
  1105. if (err) {
  1106. err = await utils.runJob("GET_ERROR", { error: err });
  1107. console.log(
  1108. "ERROR",
  1109. "STATIONS_UPDATE_GENRES",
  1110. `Updating station "${stationId}" genres to "${newGenres}" failed. "${err}"`
  1111. );
  1112. return cb({ status: "failure", message: err });
  1113. }
  1114. console.log(
  1115. "SUCCESS",
  1116. "STATIONS_UPDATE_GENRES",
  1117. `Updated station "${stationId}" genres to "${newGenres}" successfully.`
  1118. );
  1119. return cb({
  1120. status: "success",
  1121. message: "Successfully updated the genres."
  1122. });
  1123. }
  1124. );
  1125. }),
  1126. /**
  1127. * Updates a station's blacklisted genres
  1128. *
  1129. * @param session
  1130. * @param stationId - the station id
  1131. * @param newBlacklistedGenres - the new station blacklisted genres
  1132. * @param cb
  1133. */
  1134. updateBlacklistedGenres: isOwnerRequired(async (session, stationId, newBlacklistedGenres, cb) => {
  1135. const stationModel = await db.runJob("GET_MODEL", {
  1136. modelName: "station"
  1137. });
  1138. async.waterfall(
  1139. [
  1140. next => {
  1141. stationModel.updateOne(
  1142. { _id: stationId },
  1143. {
  1144. $set: {
  1145. blacklistedGenres: newBlacklistedGenres
  1146. }
  1147. },
  1148. { runValidators: true },
  1149. next
  1150. );
  1151. },
  1152. (res, next) => {
  1153. stations
  1154. .runJob("UPDATE_STATION", { stationId })
  1155. .then(station => {
  1156. next(null, station);
  1157. })
  1158. .catch(next);
  1159. }
  1160. ],
  1161. async err => {
  1162. if (err) {
  1163. err = await utils.runJob("GET_ERROR", { error: err });
  1164. console.log(
  1165. "ERROR",
  1166. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1167. `Updating station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" failed. "${err}"`
  1168. );
  1169. return cb({ status: "failure", message: err });
  1170. }
  1171. console.log(
  1172. "SUCCESS",
  1173. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1174. `Updated station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" successfully.`
  1175. );
  1176. return cb({
  1177. status: "success",
  1178. message: "Successfully updated the blacklisted genres."
  1179. });
  1180. }
  1181. );
  1182. }),
  1183. /**
  1184. * Updates a station's party mode
  1185. *
  1186. * @param session
  1187. * @param stationId - the station id
  1188. * @param newPartyMode - the new station party mode
  1189. * @param cb
  1190. */
  1191. updatePartyMode: isOwnerRequired(async (session, stationId, newPartyMode, cb) => {
  1192. const stationModel = await db.runJob("GET_MODEL", {
  1193. modelName: "station"
  1194. });
  1195. async.waterfall(
  1196. [
  1197. next => {
  1198. stations
  1199. .runJob("GET_STATION", { stationId })
  1200. .then(station => {
  1201. next(null, station);
  1202. })
  1203. .catch(next);
  1204. },
  1205. (station, next) => {
  1206. if (!station) return next("Station not found.");
  1207. if (station.partyMode === newPartyMode)
  1208. return next(`The party mode was already ${newPartyMode ? "enabled." : "disabled."}`);
  1209. return stationModel.updateOne(
  1210. { _id: stationId },
  1211. { $set: { partyMode: newPartyMode } },
  1212. { runValidators: true },
  1213. next
  1214. );
  1215. },
  1216. (res, next) => {
  1217. stations
  1218. .runJob("UPDATE_STATION", { stationId })
  1219. .then(station => {
  1220. next(null, station);
  1221. })
  1222. .catch(next);
  1223. }
  1224. ],
  1225. async err => {
  1226. if (err) {
  1227. err = await utils.runJob("GET_ERROR", { error: err });
  1228. console.log(
  1229. "ERROR",
  1230. "STATIONS_UPDATE_PARTY_MODE",
  1231. `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`
  1232. );
  1233. return cb({ status: "failure", message: err });
  1234. }
  1235. console.log(
  1236. "SUCCESS",
  1237. "STATIONS_UPDATE_PARTY_MODE",
  1238. `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`
  1239. );
  1240. cache.runJob("PUB", {
  1241. channel: "station.updatePartyMode",
  1242. value: {
  1243. stationId,
  1244. partyMode: newPartyMode
  1245. }
  1246. });
  1247. stations.runJob("SKIP_STATION", { stationId });
  1248. return cb({
  1249. status: "success",
  1250. message: "Successfully updated the party mode."
  1251. });
  1252. }
  1253. );
  1254. }),
  1255. /**
  1256. * Pauses a station
  1257. *
  1258. * @param session
  1259. * @param stationId - the station id
  1260. * @param cb
  1261. */
  1262. pause: isOwnerRequired(async (session, stationId, cb) => {
  1263. const stationModel = await db.runJob("GET_MODEL", {
  1264. modelName: "station"
  1265. });
  1266. async.waterfall(
  1267. [
  1268. next => {
  1269. stations
  1270. .runJob("GET_STATION", { stationId })
  1271. .then(station => {
  1272. next(null, station);
  1273. })
  1274. .catch(next);
  1275. },
  1276. (station, next) => {
  1277. if (!station) return next("Station not found.");
  1278. if (station.paused) return next("That station was already paused.");
  1279. return stationModel.updateOne(
  1280. { _id: stationId },
  1281. { $set: { paused: true, pausedAt: Date.now() } },
  1282. next
  1283. );
  1284. },
  1285. (res, next) => {
  1286. stations
  1287. .runJob("UPDATE_STATION", { stationId })
  1288. .then(station => {
  1289. next(null, station);
  1290. })
  1291. .catch(next);
  1292. }
  1293. ],
  1294. async err => {
  1295. if (err) {
  1296. err = await utils.runJob("GET_ERROR", { error: err });
  1297. console.log("ERROR", "STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  1298. return cb({ status: "failure", message: err });
  1299. }
  1300. console.log("SUCCESS", "STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  1301. cache.runJob("PUB", {
  1302. channel: "station.pause",
  1303. value: stationId
  1304. });
  1305. notifications.runJob("UNSCHEDULE", {
  1306. name: `stations.nextSong?id=${stationId}`
  1307. });
  1308. return cb({
  1309. status: "success",
  1310. message: "Successfully paused."
  1311. });
  1312. }
  1313. );
  1314. }),
  1315. /**
  1316. * Resumes a station
  1317. *
  1318. * @param session
  1319. * @param stationId - the station id
  1320. * @param cb
  1321. */
  1322. resume: isOwnerRequired(async (session, stationId, cb) => {
  1323. const stationModel = await db.runJob("GET_MODEL", {
  1324. modelName: "station"
  1325. });
  1326. async.waterfall(
  1327. [
  1328. next => {
  1329. stations
  1330. .runJob("GET_STATION", { stationId })
  1331. .then(station => {
  1332. next(null, station);
  1333. })
  1334. .catch(next);
  1335. },
  1336. (station, next) => {
  1337. if (!station) return next("Station not found.");
  1338. if (!station.paused) return next("That station is not paused.");
  1339. station.timePaused += Date.now() - station.pausedAt;
  1340. return stationModel.updateOne(
  1341. { _id: stationId },
  1342. {
  1343. $set: { paused: false },
  1344. $inc: { timePaused: Date.now() - station.pausedAt }
  1345. },
  1346. next
  1347. );
  1348. },
  1349. (res, next) => {
  1350. stations
  1351. .runJob("UPDATE_STATION", { stationId })
  1352. .then(station => {
  1353. next(null, station);
  1354. })
  1355. .catch(next);
  1356. }
  1357. ],
  1358. async err => {
  1359. if (err) {
  1360. err = await utils.runJob("GET_ERROR", { error: err });
  1361. console.log("ERROR", "STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  1362. return cb({ status: "failure", message: err });
  1363. }
  1364. console.log("SUCCESS", "STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  1365. cache.runJob("PUB", {
  1366. channel: "station.resume",
  1367. value: stationId
  1368. });
  1369. return cb({
  1370. status: "success",
  1371. message: "Successfully resumed."
  1372. });
  1373. }
  1374. );
  1375. }),
  1376. /**
  1377. * Removes a station
  1378. *
  1379. * @param session
  1380. * @param stationId - the station id
  1381. * @param cb
  1382. */
  1383. remove: isOwnerRequired(async (session, stationId, cb) => {
  1384. const stationModel = await db.runJob("GET_MODEL", {
  1385. modelName: "station"
  1386. });
  1387. async.waterfall(
  1388. [
  1389. next => {
  1390. stationModel.deleteOne({ _id: stationId }, err => next(err));
  1391. },
  1392. next => {
  1393. cache.runJob("HDEL", { table: "stations", key: stationId }).then(next).catch(next);
  1394. }
  1395. ],
  1396. async err => {
  1397. if (err) {
  1398. err = await utils.runJob("GET_ERROR", { error: err });
  1399. console.log("ERROR", "STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  1400. return cb({ status: "failure", message: err });
  1401. }
  1402. console.log("SUCCESS", "STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  1403. cache.runJob("PUB", {
  1404. channel: "station.remove",
  1405. value: stationId
  1406. });
  1407. activities.runJob("ADD_ACTIVITY", {
  1408. userId: session.userId,
  1409. activityType: "deleted_station",
  1410. payload: [stationId]
  1411. });
  1412. return cb({
  1413. status: "success",
  1414. message: "Successfully removed."
  1415. });
  1416. }
  1417. );
  1418. }),
  1419. /**
  1420. * Create a station
  1421. *
  1422. * @param session
  1423. * @param data - the station data
  1424. * @param cb
  1425. */
  1426. create: isLoginRequired(async (session, data, cb) => {
  1427. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1428. const stationModel = await db.runJob("GET_MODEL", {
  1429. modelName: "station"
  1430. });
  1431. data.name = data.name.toLowerCase();
  1432. const blacklist = [
  1433. "country",
  1434. "edm",
  1435. "musare",
  1436. "hip-hop",
  1437. "rap",
  1438. "top-hits",
  1439. "todays-hits",
  1440. "old-school",
  1441. "christmas",
  1442. "about",
  1443. "support",
  1444. "staff",
  1445. "help",
  1446. "news",
  1447. "terms",
  1448. "privacy",
  1449. "profile",
  1450. "c",
  1451. "community",
  1452. "tos",
  1453. "login",
  1454. "register",
  1455. "p",
  1456. "official",
  1457. "o",
  1458. "trap",
  1459. "faq",
  1460. "team",
  1461. "donate",
  1462. "buy",
  1463. "shop",
  1464. "forums",
  1465. "explore",
  1466. "settings",
  1467. "admin",
  1468. "auth",
  1469. "reset_password"
  1470. ];
  1471. async.waterfall(
  1472. [
  1473. next => {
  1474. if (!data) return next("Invalid data.");
  1475. return next();
  1476. },
  1477. next => {
  1478. stationModel.findOne(
  1479. {
  1480. $or: [
  1481. { name: data.name },
  1482. {
  1483. displayName: new RegExp(`^${data.displayName}$`, "i")
  1484. }
  1485. ]
  1486. },
  1487. next
  1488. );
  1489. },
  1490. // eslint-disable-next-line consistent-return
  1491. (station, next) => {
  1492. console.log(station);
  1493. if (station) return next("A station with that name or display name already exists.");
  1494. const { name, displayName, description, genres, playlist, type, blacklistedGenres } = data;
  1495. if (type === "official") {
  1496. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1497. if (err) return next(err);
  1498. if (!user) return next("User not found.");
  1499. if (user.role !== "admin") return next("Admin required.");
  1500. return stationModel.create(
  1501. {
  1502. name,
  1503. displayName,
  1504. description,
  1505. type,
  1506. privacy: "private",
  1507. playlist,
  1508. genres,
  1509. blacklistedGenres,
  1510. currentSong: stations.defaultSong
  1511. },
  1512. next
  1513. );
  1514. });
  1515. }
  1516. if (type === "community") {
  1517. if (blacklist.indexOf(name) !== -1)
  1518. return next("That name is blacklisted. Please use a different name.");
  1519. return stationModel.create(
  1520. {
  1521. name,
  1522. displayName,
  1523. description,
  1524. type,
  1525. privacy: "private",
  1526. owner: session.userId,
  1527. queue: [],
  1528. currentSong: null
  1529. },
  1530. next
  1531. );
  1532. }
  1533. }
  1534. ],
  1535. async (err, station) => {
  1536. if (err) {
  1537. err = await utils.runJob("GET_ERROR", { error: err });
  1538. console.log("ERROR", "STATIONS_CREATE", `Creating station failed. "${err}"`);
  1539. return cb({ status: "failure", message: err });
  1540. }
  1541. console.log("SUCCESS", "STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  1542. cache.runJob("PUB", {
  1543. channel: "station.create",
  1544. value: station._id
  1545. });
  1546. activities.runJob("ADD_ACTIVITY", {
  1547. userId: session.userId,
  1548. activityType: "created_station",
  1549. payload: [station._id]
  1550. });
  1551. return cb({
  1552. status: "success",
  1553. message: "Successfully created station."
  1554. });
  1555. }
  1556. );
  1557. }),
  1558. /**
  1559. * Adds song to station queue
  1560. *
  1561. * @param session
  1562. * @param stationId - the station id
  1563. * @param songId - the song id
  1564. * @param cb
  1565. */
  1566. addToQueue: isLoginRequired(async (session, stationId, songId, cb) => {
  1567. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1568. const stationModel = await db.runJob("GET_MODEL", {
  1569. modelName: "station"
  1570. });
  1571. async.waterfall(
  1572. [
  1573. next => {
  1574. stations
  1575. .runJob("GET_STATION", { stationId })
  1576. .then(station => {
  1577. next(null, station);
  1578. })
  1579. .catch(next);
  1580. },
  1581. (station, next) => {
  1582. if (!station) return next("Station not found.");
  1583. if (station.locked) {
  1584. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1585. if (user.role !== "admin" && station.owner !== session.userId)
  1586. return next("Only owners and admins can add songs to a locked queue.");
  1587. return next(null, station);
  1588. });
  1589. }
  1590. return next(null, station);
  1591. },
  1592. (station, next) => {
  1593. if (station.type !== "community") return next("That station is not a community station.");
  1594. return stations
  1595. .runJob("CAN_USER_VIEW_STATION", {
  1596. station,
  1597. userId: session.userId
  1598. })
  1599. .then(canView => {
  1600. if (canView) return next(null, station);
  1601. return next("Insufficient permissions.");
  1602. })
  1603. .catch(err => next(err));
  1604. },
  1605. (station, next) => {
  1606. if (station.currentSong && station.currentSong.songId === songId)
  1607. return next("That song is currently playing.");
  1608. return async.each(
  1609. station.queue,
  1610. (queueSong, next) => {
  1611. if (queueSong.songId === songId) return next("That song is already in the queue.");
  1612. return next();
  1613. },
  1614. err => next(err, station)
  1615. );
  1616. },
  1617. (station, next) => {
  1618. // songs
  1619. // .runJob("GET_SONG", { id: songId })
  1620. // .then((song) => {
  1621. // if (song) return next(null, song, station);
  1622. // else {
  1623. utils
  1624. .runJob("GET_SONG_FROM_YOUTUBE", { songId })
  1625. .then(response => {
  1626. const { song } = response;
  1627. song.artists = [];
  1628. song.skipDuration = 0;
  1629. song.likes = -1;
  1630. song.dislikes = -1;
  1631. song.thumbnail = "empty";
  1632. song.explicit = false;
  1633. next(null, song, station);
  1634. })
  1635. .catch(err => {
  1636. next(err);
  1637. });
  1638. // }
  1639. // })
  1640. // .catch((err) => {
  1641. // next(err);
  1642. // });
  1643. },
  1644. (song, station, next) => {
  1645. const { queue } = station;
  1646. song.requestedBy = session.userId;
  1647. queue.push(song);
  1648. let totalDuration = 0;
  1649. queue.forEach(song => {
  1650. totalDuration += song.duration;
  1651. });
  1652. if (totalDuration >= 3600 * 3) return next("The max length of the queue is 3 hours.");
  1653. return next(null, song, station);
  1654. },
  1655. (song, station, next) => {
  1656. const { queue } = station;
  1657. if (queue.length === 0) return next(null, song, station);
  1658. let totalDuration = 0;
  1659. const userId = queue[queue.length - 1].requestedBy;
  1660. station.queue.forEach(song => {
  1661. if (userId === song.requestedBy) {
  1662. totalDuration += song.duration;
  1663. }
  1664. });
  1665. if (totalDuration >= 900) return next("The max length of songs per user is 15 minutes.");
  1666. return next(null, song, station);
  1667. },
  1668. (song, station, next) => {
  1669. const { queue } = station;
  1670. if (queue.length === 0) return next(null, song);
  1671. let totalSongs = 0;
  1672. const userId = queue[queue.length - 1].requestedBy;
  1673. queue.forEach(song => {
  1674. if (userId === song.requestedBy) {
  1675. totalSongs += 1;
  1676. }
  1677. });
  1678. if (totalSongs <= 2) return next(null, song);
  1679. if (totalSongs > 3)
  1680. return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1681. if (queue[queue.length - 2].requestedBy !== userId || queue[queue.length - 3] !== userId)
  1682. return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1683. return next(null, song);
  1684. },
  1685. (song, next) => {
  1686. stationModel.updateOne(
  1687. { _id: stationId },
  1688. { $push: { queue: song } },
  1689. { runValidators: true },
  1690. next
  1691. );
  1692. },
  1693. (res, next) => {
  1694. stations
  1695. .runJob("UPDATE_STATION", { stationId })
  1696. .then(station => {
  1697. next(null, station);
  1698. })
  1699. .catch(next);
  1700. }
  1701. ],
  1702. async err => {
  1703. if (err) {
  1704. err = await utils.runJob("GET_ERROR", { error: err });
  1705. console.log(
  1706. "ERROR",
  1707. "STATIONS_ADD_SONG_TO_QUEUE",
  1708. `Adding song "${songId}" to station "${stationId}" queue failed. "${err}"`
  1709. );
  1710. return cb({ status: "failure", message: err });
  1711. }
  1712. console.log(
  1713. "SUCCESS",
  1714. "STATIONS_ADD_SONG_TO_QUEUE",
  1715. `Added song "${songId}" to station "${stationId}" successfully.`
  1716. );
  1717. cache.runJob("PUB", {
  1718. channel: "station.queueUpdate",
  1719. value: stationId
  1720. });
  1721. return cb({
  1722. status: "success",
  1723. message: "Successfully added song to queue."
  1724. });
  1725. }
  1726. );
  1727. }),
  1728. /**
  1729. * Removes song from station queue
  1730. *
  1731. * @param session
  1732. * @param stationId - the station id
  1733. * @param songId - the song id
  1734. * @param cb
  1735. */
  1736. removeFromQueue: isOwnerRequired(async (session, stationId, songId, cb) => {
  1737. const stationModel = await db.runJob("GET_MODEL", {
  1738. modelName: "station"
  1739. });
  1740. async.waterfall(
  1741. [
  1742. next => {
  1743. if (!songId) return next("Invalid song id.");
  1744. return stations
  1745. .runJob("GET_STATION", { stationId })
  1746. .then(station => {
  1747. next(null, station);
  1748. })
  1749. .catch(next);
  1750. },
  1751. (station, next) => {
  1752. if (!station) return next("Station not found.");
  1753. if (station.type !== "community") return next("Station is not a community station.");
  1754. return async.each(
  1755. station.queue,
  1756. (queueSong, next) => {
  1757. if (queueSong.songId === songId) return next(true);
  1758. return next();
  1759. },
  1760. err => {
  1761. if (err === true) return next();
  1762. return next("Song is not currently in the queue.");
  1763. }
  1764. );
  1765. },
  1766. next => {
  1767. stationModel.updateOne({ _id: stationId }, { $pull: { queue: { songId } } }, next);
  1768. },
  1769. (res, next) => {
  1770. stations
  1771. .runJob("UPDATE_STATION", { stationId })
  1772. .then(station => {
  1773. next(null, station);
  1774. })
  1775. .catch(next);
  1776. }
  1777. ],
  1778. async err => {
  1779. if (err) {
  1780. err = await utils.runJob("GET_ERROR", { error: err });
  1781. console.log(
  1782. "ERROR",
  1783. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1784. `Removing song "${songId}" from station "${stationId}" queue failed. "${err}"`
  1785. );
  1786. return cb({ status: "failure", message: err });
  1787. }
  1788. console.log(
  1789. "SUCCESS",
  1790. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1791. `Removed song "${songId}" from station "${stationId}" successfully.`
  1792. );
  1793. cache.runJob("PUB", {
  1794. channel: "station.queueUpdate",
  1795. value: stationId
  1796. });
  1797. return cb({
  1798. status: "success",
  1799. message: "Successfully removed song from queue."
  1800. });
  1801. }
  1802. );
  1803. }),
  1804. /**
  1805. * Gets the queue from a station
  1806. *
  1807. * @param {object} session - user session
  1808. * @param {string} stationId - the station id
  1809. * @param {Function} cb - callback
  1810. */
  1811. getQueue: (session, stationId, cb) => {
  1812. async.waterfall(
  1813. [
  1814. next => {
  1815. stations
  1816. .runJob("GET_STATION", { stationId })
  1817. .then(station => {
  1818. next(null, station);
  1819. })
  1820. .catch(next);
  1821. },
  1822. (station, next) => {
  1823. if (!station) return next("Station not found.");
  1824. if (station.type !== "community") return next("Station is not a community station.");
  1825. return next(null, station);
  1826. },
  1827. (station, next) => {
  1828. stations
  1829. .runJob("CAN_USER_VIEW_STATION", {
  1830. station,
  1831. userId: session.userId
  1832. })
  1833. .then(canView => {
  1834. if (canView) return next(null, station);
  1835. return next("Insufficient permissions.");
  1836. })
  1837. .catch(err => next(err));
  1838. }
  1839. ],
  1840. async (err, station) => {
  1841. if (err) {
  1842. err = await utils.runJob("GET_ERROR", { error: err });
  1843. console.log(
  1844. "ERROR",
  1845. "STATIONS_GET_QUEUE",
  1846. `Getting queue for station "${stationId}" failed. "${err}"`
  1847. );
  1848. return cb({ status: "failure", message: err });
  1849. }
  1850. console.log("SUCCESS", "STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  1851. return cb({
  1852. status: "success",
  1853. message: "Successfully got queue.",
  1854. queue: station.queue
  1855. });
  1856. }
  1857. );
  1858. },
  1859. /**
  1860. * Selects a private playlist for a station
  1861. *
  1862. * @param session
  1863. * @param stationId - the station id
  1864. * @param playlistId - the private playlist id
  1865. * @param cb
  1866. */
  1867. selectPrivatePlaylist: isOwnerRequired(async (session, stationId, playlistId, cb) => {
  1868. const stationModel = await db.runJob("GET_MODEL", {
  1869. modelName: "station"
  1870. });
  1871. const playlistModel = await db.runJob("GET_MODEL", {
  1872. modelName: "playlist"
  1873. });
  1874. async.waterfall(
  1875. [
  1876. next => {
  1877. stations
  1878. .runJob("GET_STATION", { stationId })
  1879. .then(station => {
  1880. next(null, station);
  1881. })
  1882. .catch(next);
  1883. },
  1884. (station, next) => {
  1885. if (!station) return next("Station not found.");
  1886. if (station.type !== "community") return next("Station is not a community station.");
  1887. if (station.privatePlaylist === playlistId)
  1888. return next("That private playlist is already selected.");
  1889. return playlistModel.findOne({ _id: playlistId }, next);
  1890. },
  1891. (playlist, next) => {
  1892. if (!playlist) return next("Playlist not found.");
  1893. const currentSongIndex = playlist.songs.length > 0 ? playlist.songs.length - 1 : 0;
  1894. return stationModel.updateOne(
  1895. { _id: stationId },
  1896. {
  1897. $set: {
  1898. privatePlaylist: playlistId,
  1899. currentSongIndex
  1900. }
  1901. },
  1902. { runValidators: true },
  1903. next
  1904. );
  1905. },
  1906. (res, next) => {
  1907. stations
  1908. .runJob("UPDATE_STATION", { stationId })
  1909. .then(station => {
  1910. next(null, station);
  1911. })
  1912. .catch(next);
  1913. }
  1914. ],
  1915. async (err, station) => {
  1916. if (err) {
  1917. err = await utils.runJob("GET_ERROR", { error: err });
  1918. console.log(
  1919. "ERROR",
  1920. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  1921. `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  1922. );
  1923. return cb({ status: "failure", message: err });
  1924. }
  1925. console.log(
  1926. "SUCCESS",
  1927. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  1928. `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`
  1929. );
  1930. notifications.runJob("UNSCHEDULE", {
  1931. name: `stations.nextSong?id${stationId}`
  1932. });
  1933. if (!station.partyMode) stations.runJob("SKIP_STATION", { stationId });
  1934. cache.runJob("PUB", {
  1935. channel: "privatePlaylist.selected",
  1936. value: {
  1937. playlistId,
  1938. stationId
  1939. }
  1940. });
  1941. return cb({
  1942. status: "success",
  1943. message: "Successfully selected playlist."
  1944. });
  1945. }
  1946. );
  1947. }),
  1948. favoriteStation: isLoginRequired(async (session, stationId, cb) => {
  1949. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1950. async.waterfall(
  1951. [
  1952. next => {
  1953. stations
  1954. .runJob("GET_STATION", { stationId })
  1955. .then(station => {
  1956. next(null, station);
  1957. })
  1958. .catch(next);
  1959. },
  1960. (station, next) => {
  1961. if (!station) return next("Station not found.");
  1962. return stations
  1963. .runJob("CAN_USER_VIEW_STATION", {
  1964. station,
  1965. userId: session.userId
  1966. })
  1967. .then(canView => {
  1968. if (canView) return next();
  1969. return next("Insufficient permissions.");
  1970. })
  1971. .catch(err => next(err));
  1972. },
  1973. next => {
  1974. userModel.updateOne({ _id: session.userId }, { $addToSet: { favoriteStations: stationId } }, next);
  1975. },
  1976. (res, next) => {
  1977. if (res.nModified === 0) return next("The station was already favorited.");
  1978. return next();
  1979. }
  1980. ],
  1981. async err => {
  1982. if (err) {
  1983. err = await utils.runJob("GET_ERROR", { error: err });
  1984. console.log("ERROR", "FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  1985. return cb({ status: "failure", message: err });
  1986. }
  1987. console.log("SUCCESS", "FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  1988. cache.runJob("PUB", {
  1989. channel: "user.favoritedStation",
  1990. value: {
  1991. userId: session.userId,
  1992. stationId
  1993. }
  1994. });
  1995. return cb({
  1996. status: "success",
  1997. message: "Succesfully favorited station."
  1998. });
  1999. }
  2000. );
  2001. }),
  2002. unfavoriteStation: isLoginRequired(async (session, stationId, cb) => {
  2003. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  2004. async.waterfall(
  2005. [
  2006. next => {
  2007. userModel.updateOne({ _id: session.userId }, { $pull: { favoriteStations: stationId } }, next);
  2008. },
  2009. (res, next) => {
  2010. if (res.nModified === 0) return next("The station wasn't favorited.");
  2011. return next();
  2012. }
  2013. ],
  2014. async err => {
  2015. if (err) {
  2016. err = await utils.runJob("GET_ERROR", { error: err });
  2017. console.log("ERROR", "UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  2018. return cb({ status: "failure", message: err });
  2019. }
  2020. console.log("SUCCESS", "UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  2021. cache.runJob("PUB", {
  2022. channel: "user.unfavoritedStation",
  2023. value: {
  2024. userId: session.userId,
  2025. stationId
  2026. }
  2027. });
  2028. return cb({
  2029. status: "success",
  2030. message: "Succesfully unfavorited station."
  2031. });
  2032. }
  2033. );
  2034. })
  2035. };