stations.js 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151
  1. import async from "async";
  2. import { isLoginRequired, isOwnerRequired } from "./hooks";
  3. import db from "../db";
  4. import utils from "../utils";
  5. import songs from "../songs";
  6. import cache from "../cache";
  7. import notifications from "../notifications";
  8. import stations from "../stations";
  9. import activities from "../activities";
  10. import YouTubeModule from "../youtube";
  11. // const logger = moduleManager.modules["logger"];
  12. const userList = {};
  13. const usersPerStation = {};
  14. const usersPerStationCount = {};
  15. // Temporarily disabled until the messages in console can be limited
  16. // setInterval(async () => {
  17. // const stationsCountUpdated = [];
  18. // const stationsUpdated = [];
  19. // const oldUsersPerStation = usersPerStation;
  20. // usersPerStation = {};
  21. // const oldUsersPerStationCount = usersPerStationCount;
  22. // usersPerStationCount = {};
  23. // const userModel = await db.runJob("GET_MODEL", {
  24. // modelName: "user"
  25. // });
  26. // async.each(
  27. // Object.keys(userList),
  28. // (socketId, next) => {
  29. // utils.runJob("SOCKET_FROM_SESSION", { socketId }, { isQuiet: true }).then(socket => {
  30. // const stationId = userList[socketId];
  31. // if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
  32. // if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  33. // if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  34. // delete userList[socketId];
  35. // return next();
  36. // }
  37. // if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
  38. // usersPerStationCount[stationId] += 1;
  39. // if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
  40. // return async.waterfall(
  41. // [
  42. // next => {
  43. // if (!socket.session || !socket.session.sessionId) return next("No session found.");
  44. // return cache
  45. // .runJob("HGET", {
  46. // table: "sessions",
  47. // key: socket.session.sessionId
  48. // })
  49. // .then(session => {
  50. // next(null, session);
  51. // })
  52. // .catch(next);
  53. // },
  54. // (session, next) => {
  55. // if (!session) return next("Session not found.");
  56. // return userModel.findOne({ _id: session.userId }, next);
  57. // },
  58. // (user, next) => {
  59. // if (!user) return next("User not found.");
  60. // if (usersPerStation[stationId].indexOf(user.username) !== -1)
  61. // return next("User already in the list.");
  62. // return next(null, user.username);
  63. // }
  64. // ],
  65. // (err, username) => {
  66. // if (!err) {
  67. // usersPerStation[stationId].push(username);
  68. // }
  69. // next();
  70. // }
  71. // );
  72. // });
  73. // // TODO Code to show users
  74. // },
  75. // () => {
  76. // Object.keys(usersPerStationCount).forEach(stationId => {
  77. // if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
  78. // if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
  79. // }
  80. // })
  81. //
  82. // Object.keys(usersPerStation).forEach(stationId => {
  83. // if (
  84. // _.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 ||
  85. // _.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0
  86. // ) {
  87. // if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
  88. // }
  89. // });
  90. //
  91. // stationsCountUpdated.forEach(stationId => {
  92. // console.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
  93. // cache.runJob("PUB", {
  94. // table: "station.updateUserCount",
  95. // value: stationId
  96. // });
  97. // });
  98. // stationsUpdated.forEach(stationId => {
  99. // console.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
  100. // cache.runJob("PUB", {
  101. // table: "station.updateUsers",
  102. // value: stationId
  103. // });
  104. // });
  105. // // console.log("Userlist", usersPerStation);
  106. // }
  107. // );
  108. // }, 3000);
  109. cache.runJob("SUB", {
  110. channel: "station.updateUsers",
  111. cb: stationId => {
  112. const list = usersPerStation[stationId] || [];
  113. utils.runJob("EMIT_TO_ROOM", {
  114. room: `station.${stationId}`,
  115. args: ["event:users.updated", list]
  116. });
  117. }
  118. });
  119. cache.runJob("SUB", {
  120. channel: "station.updateUserCount",
  121. cb: stationId => {
  122. const count = usersPerStationCount[stationId] || 0;
  123. utils.runJob("EMIT_TO_ROOM", {
  124. room: `station.${stationId}`,
  125. args: ["event:userCount.updated", count]
  126. });
  127. stations.runJob("GET_STATION", { stationId }).then(async station => {
  128. if (station.privacy === "public")
  129. utils.runJob("EMIT_TO_ROOM", {
  130. room: "home",
  131. args: ["event:userCount.updated", stationId, count]
  132. });
  133. else {
  134. const sockets = await utils.runJob("GET_ROOM_SOCKETS", {
  135. room: "home"
  136. });
  137. Object.keys(sockets).forEach(socketKey => {
  138. const socket = sockets[socketKey];
  139. const { session } = socket;
  140. if (session.sessionId) {
  141. cache
  142. .runJob("HGET", {
  143. table: "sessions",
  144. key: session.sessionId
  145. })
  146. .then(session => {
  147. if (session)
  148. db.runJob("GET_MODEL", {
  149. modelName: "user"
  150. }).then(userModel =>
  151. userModel.findOne({ _id: session.userId }, (err, user) => {
  152. if (user.role === "admin")
  153. socket.emit("event:userCount.updated", stationId, count);
  154. else if (station.type === "community" && station.owner === session.userId)
  155. socket.emit("event:userCount.updated", stationId, count);
  156. })
  157. );
  158. });
  159. }
  160. });
  161. }
  162. });
  163. }
  164. });
  165. cache.runJob("SUB", {
  166. channel: "station.queueLockToggled",
  167. cb: data => {
  168. utils.runJob("EMIT_TO_ROOM", {
  169. room: `station.${data.stationId}`,
  170. args: ["event:queueLockToggled", data.locked]
  171. });
  172. }
  173. });
  174. cache.runJob("SUB", {
  175. channel: "station.updatePartyMode",
  176. cb: data => {
  177. utils.runJob("EMIT_TO_ROOM", {
  178. room: `station.${data.stationId}`,
  179. args: ["event:partyMode.updated", data.partyMode]
  180. });
  181. }
  182. });
  183. cache.runJob("SUB", {
  184. channel: "privatePlaylist.selected",
  185. cb: data => {
  186. utils.runJob("EMIT_TO_ROOM", {
  187. room: `station.${data.stationId}`,
  188. args: ["event:privatePlaylist.selected", data.playlistId]
  189. });
  190. }
  191. });
  192. cache.runJob("SUB", {
  193. channel: "station.pause",
  194. cb: stationId => {
  195. stations.runJob("GET_STATION", { stationId }).then(station => {
  196. utils.runJob("EMIT_TO_ROOM", {
  197. room: `station.${stationId}`,
  198. args: ["event:stations.pause", { pausedAt: station.pausedAt }]
  199. });
  200. });
  201. }
  202. });
  203. cache.runJob("SUB", {
  204. channel: "station.resume",
  205. cb: stationId => {
  206. stations.runJob("GET_STATION", { stationId }).then(station => {
  207. utils.runJob("EMIT_TO_ROOM", {
  208. room: `station.${stationId}`,
  209. args: ["event:stations.resume", { timePaused: station.timePaused }]
  210. });
  211. });
  212. }
  213. });
  214. cache.runJob("SUB", {
  215. channel: "station.queueUpdate",
  216. cb: stationId => {
  217. stations.runJob("GET_STATION", { stationId }).then(station => {
  218. utils.runJob("EMIT_TO_ROOM", {
  219. room: `station.${stationId}`,
  220. args: ["event:queue.update", station.queue]
  221. });
  222. });
  223. }
  224. });
  225. cache.runJob("SUB", {
  226. channel: "station.voteSkipSong",
  227. cb: stationId => {
  228. utils.runJob("EMIT_TO_ROOM", {
  229. room: `station.${stationId}`,
  230. args: ["event:song.voteSkipSong"]
  231. });
  232. }
  233. });
  234. cache.runJob("SUB", {
  235. channel: "station.remove",
  236. cb: stationId => {
  237. utils.runJob("EMIT_TO_ROOM", {
  238. room: `station.${stationId}`,
  239. args: ["event:stations.remove"]
  240. });
  241. utils.runJob("EMIT_TO_ROOM", {
  242. room: "admin.stations",
  243. args: ["event:admin.station.removed", stationId]
  244. });
  245. }
  246. });
  247. cache.runJob("SUB", {
  248. channel: "station.create",
  249. cb: async stationId => {
  250. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  251. stations.runJob("INITIALIZE_STATION", { stationId }).then(async response => {
  252. const { station } = response;
  253. station.userCount = usersPerStationCount[stationId] || 0;
  254. utils.runJob("EMIT_TO_ROOM", {
  255. room: "admin.stations",
  256. args: ["event:admin.station.added", station]
  257. });
  258. // TODO If community, check if on whitelist
  259. if (station.privacy === "public")
  260. utils.runJob("EMIT_TO_ROOM", {
  261. room: "home",
  262. args: ["event:stations.created", station]
  263. });
  264. else {
  265. const sockets = await utils.runJob("GET_ROOM_SOCKETS", {
  266. room: "home"
  267. });
  268. Object.keys(sockets).forEach(socketKey => {
  269. const socket = sockets[socketKey];
  270. const { session } = socket;
  271. if (session.sessionId) {
  272. cache
  273. .runJob("HGET", {
  274. table: "sessions",
  275. key: session.sessionId
  276. })
  277. .then(session => {
  278. if (session) {
  279. userModel.findOne({ _id: session.userId }, (err, user) => {
  280. if (user.role === "admin") socket.emit("event:stations.created", station);
  281. else if (station.type === "community" && station.owner === session.userId)
  282. socket.emit("event:stations.created", station);
  283. });
  284. }
  285. });
  286. }
  287. });
  288. }
  289. });
  290. }
  291. });
  292. export default {
  293. /**
  294. * Get a list of all the stations
  295. *
  296. * @param {object} session - user session
  297. * @param {Function} cb - callback
  298. */
  299. index: (session, cb) => {
  300. async.waterfall(
  301. [
  302. next => {
  303. cache.runJob("HGETALL", { table: "stations" }).then(stations => {
  304. next(null, stations);
  305. });
  306. },
  307. (items, next) => {
  308. const filteredStations = [];
  309. async.each(
  310. items,
  311. (station, next) => {
  312. async.waterfall(
  313. [
  314. next => {
  315. stations
  316. .runJob("CAN_USER_VIEW_STATION", {
  317. station,
  318. userId: session.userId,
  319. hideUnlisted: true
  320. })
  321. .then(exists => {
  322. next(null, exists);
  323. })
  324. .catch(next);
  325. }
  326. ],
  327. (err, exists) => {
  328. if (err) console.log(err);
  329. station.userCount = usersPerStationCount[station._id] || 0;
  330. if (exists) filteredStations.push(station);
  331. next();
  332. }
  333. );
  334. },
  335. () => next(null, filteredStations)
  336. );
  337. }
  338. ],
  339. async (err, stations) => {
  340. if (err) {
  341. err = await utils.runJob("GET_ERROR", { error: err });
  342. console.log("ERROR", "STATIONS_INDEX", `Indexing stations failed. "${err}"`);
  343. return cb({ status: "failure", message: err });
  344. }
  345. console.log("SUCCESS", "STATIONS_INDEX", `Indexing stations successful.`, false);
  346. return cb({ status: "success", stations });
  347. }
  348. );
  349. },
  350. /**
  351. * Obtains basic metadata of a station in order to format an activity
  352. *
  353. * @param {object} session - user session
  354. * @param {string} stationId - the station id
  355. * @param {Function} cb - callback
  356. */
  357. getStationForActivity: (session, stationId, cb) => {
  358. async.waterfall(
  359. [
  360. next => {
  361. stations
  362. .runJob("GET_STATION", { stationId })
  363. .then(station => {
  364. next(null, station);
  365. })
  366. .catch(next);
  367. }
  368. ],
  369. async (err, station) => {
  370. if (err) {
  371. err = await utils.runJob("GET_ERROR", { error: err });
  372. console.log(
  373. "ERROR",
  374. "STATIONS_GET_STATION_FOR_ACTIVITY",
  375. `Failed to obtain metadata of station ${stationId} for activity formatting. "${err}"`
  376. );
  377. return cb({ status: "failure", message: err });
  378. }
  379. console.log(
  380. "SUCCESS",
  381. "STATIONS_GET_STATION_FOR_ACTIVITY",
  382. `Obtained metadata of station ${stationId} for activity formatting successfully.`
  383. );
  384. return cb({
  385. status: "success",
  386. data: {
  387. title: station.displayName,
  388. thumbnail: station.currentSong ? station.currentSong.thumbnail : ""
  389. }
  390. });
  391. }
  392. );
  393. },
  394. /**
  395. * Verifies that a station exists
  396. *
  397. * @param {object} session - user session
  398. * @param {string} stationName - the station name
  399. * @param {Function} cb - callback
  400. */
  401. existsByName: (session, stationName, cb) => {
  402. async.waterfall(
  403. [
  404. next => {
  405. stations
  406. .runJob("GET_STATION_BY_NAME", { stationName })
  407. .then(station => {
  408. next(null, station);
  409. })
  410. .catch(next);
  411. },
  412. (station, next) => {
  413. if (!station) return next(null, false);
  414. return stations
  415. .runJob("CAN_USER_VIEW_STATION", {
  416. station,
  417. userId: session.userId
  418. })
  419. .then(exists => {
  420. next(null, exists);
  421. })
  422. .catch(next);
  423. }
  424. ],
  425. async (err, exists) => {
  426. if (err) {
  427. err = await utils.runJob("GET_ERROR", { error: err });
  428. console.log(
  429. "ERROR",
  430. "STATION_EXISTS_BY_NAME",
  431. `Checking if station "${stationName}" exists failed. "${err}"`
  432. );
  433. return cb({ status: "failure", message: err });
  434. }
  435. console.log(
  436. "SUCCESS",
  437. "STATION_EXISTS_BY_NAME",
  438. `Station "${stationName}" exists successfully.` /* , false */
  439. );
  440. return cb({ status: "success", exists });
  441. }
  442. );
  443. },
  444. /**
  445. * Gets the official playlist for a station
  446. *
  447. * @param {object} session - user session
  448. * @param {string} stationId - the station id
  449. * @param {Function} cb - callback
  450. */
  451. getPlaylist: (session, stationId, cb) => {
  452. async.waterfall(
  453. [
  454. next => {
  455. stations
  456. .runJob("GET_STATION", { stationId })
  457. .then(station => {
  458. next(null, station);
  459. })
  460. .catch(next);
  461. },
  462. (station, next) => {
  463. stations
  464. .runJob("CAN_USER_VIEW_STATION", {
  465. station,
  466. userId: session.userId
  467. })
  468. .then(canView => {
  469. if (canView) return next(null, station);
  470. return next("Insufficient permissions.");
  471. })
  472. .catch(err => next(err));
  473. },
  474. (station, next) => {
  475. if (!station) return next("Station not found.");
  476. if (station.type !== "official") return next("This is not an official station.");
  477. return next();
  478. },
  479. next => {
  480. cache
  481. .runJob("HGET", {
  482. table: "officialPlaylists",
  483. key: stationId
  484. })
  485. .then(playlist => {
  486. next(null, playlist);
  487. })
  488. .catch(next);
  489. },
  490. (playlist, next) => {
  491. if (!playlist) return next("Playlist not found.");
  492. return next(null, playlist);
  493. }
  494. ],
  495. async (err, playlist) => {
  496. if (err) {
  497. err = await utils.runJob("GET_ERROR", { error: err });
  498. console.log(
  499. "ERROR",
  500. "STATIONS_GET_PLAYLIST",
  501. `Getting playlist for station "${stationId}" failed. "${err}"`
  502. );
  503. return cb({ status: "failure", message: err });
  504. }
  505. console.log(
  506. "SUCCESS",
  507. "STATIONS_GET_PLAYLIST",
  508. `Got playlist for station "${stationId}" successfully.`,
  509. false
  510. );
  511. return cb({ status: "success", data: playlist.songs });
  512. }
  513. );
  514. },
  515. /**
  516. * Joins the station by its name
  517. *
  518. * @param {object} session - user session
  519. * @param {string} stationName - the station name
  520. * @param {Function} cb - callback
  521. */
  522. join: (session, stationName, cb) => {
  523. async.waterfall(
  524. [
  525. next => {
  526. stations
  527. .runJob("GET_STATION_BY_NAME", { stationName })
  528. .then(station => {
  529. next(null, station);
  530. })
  531. .catch(next);
  532. },
  533. (station, next) => {
  534. if (!station) return next("Station not found.");
  535. return stations
  536. .runJob("CAN_USER_VIEW_STATION", {
  537. station,
  538. userId: session.userId
  539. })
  540. .then(canView => {
  541. if (!canView) next("Not allowed to join station.");
  542. else next(null, station);
  543. })
  544. .catch(err => next(err));
  545. },
  546. (station, next) => {
  547. utils.runJob("SOCKET_JOIN_ROOM", {
  548. socketId: session.socketId,
  549. room: `station.${station._id}`
  550. });
  551. const data = {
  552. _id: station._id,
  553. type: station.type,
  554. currentSong: station.currentSong,
  555. startedAt: station.startedAt,
  556. paused: station.paused,
  557. timePaused: station.timePaused,
  558. pausedAt: station.pausedAt,
  559. description: station.description,
  560. displayName: station.displayName,
  561. privacy: station.privacy,
  562. locked: station.locked,
  563. partyMode: station.partyMode,
  564. owner: station.owner,
  565. privatePlaylist: station.privatePlaylist
  566. };
  567. userList[session.socketId] = station._id;
  568. next(null, data);
  569. },
  570. (data, next) => {
  571. data = JSON.parse(JSON.stringify(data));
  572. data.userCount = usersPerStationCount[data._id] || 0;
  573. data.users = usersPerStation[data._id] || [];
  574. if (!data.currentSong || !data.currentSong.title) return next(null, data);
  575. utils.runJob("SOCKET_JOIN_SONG_ROOM", {
  576. socketId: session.socketId,
  577. room: `song.${data.currentSong.songId}`
  578. });
  579. data.currentSong.skipVotes = data.currentSong.skipVotes.length;
  580. return songs
  581. .runJob("GET_SONG_FROM_ID", {
  582. songId: data.currentSong.songId
  583. })
  584. .then(response => {
  585. const { song } = response;
  586. if (song) {
  587. data.currentSong.likes = song.likes;
  588. data.currentSong.dislikes = song.dislikes;
  589. } else {
  590. data.currentSong.likes = -1;
  591. data.currentSong.dislikes = -1;
  592. }
  593. })
  594. .catch(() => {
  595. data.currentSong.likes = -1;
  596. data.currentSong.dislikes = -1;
  597. })
  598. .finally(() => {
  599. next(null, data);
  600. });
  601. }
  602. ],
  603. async (err, data) => {
  604. if (err) {
  605. err = await utils.runJob("GET_ERROR", { error: err });
  606. console.log("ERROR", "STATIONS_JOIN", `Joining station "${stationName}" failed. "${err}"`);
  607. return cb({ status: "failure", message: err });
  608. }
  609. console.log("SUCCESS", "STATIONS_JOIN", `Joined station "${data._id}" successfully.`);
  610. return cb({ status: "success", data });
  611. }
  612. );
  613. },
  614. /**
  615. * Toggles if a station is locked
  616. *
  617. * @param session
  618. * @param stationId - the station id
  619. * @param cb
  620. */
  621. toggleLock: isOwnerRequired(async (session, stationId, cb) => {
  622. const stationModel = await db.runJob("GET_MODEL", {
  623. modelName: "station"
  624. });
  625. async.waterfall(
  626. [
  627. next => {
  628. stations
  629. .runJob("GET_STATION", { stationId })
  630. .then(station => {
  631. next(null, station);
  632. })
  633. .catch(next);
  634. },
  635. (station, next) => {
  636. stationModel.updateOne({ _id: stationId }, { $set: { locked: !station.locked } }, next);
  637. },
  638. (res, next) => {
  639. stations
  640. .runJob("UPDATE_STATION", { stationId })
  641. .then(station => {
  642. next(null, station);
  643. })
  644. .catch(next);
  645. }
  646. ],
  647. async (err, station) => {
  648. if (err) {
  649. err = await utils.runJob("GET_ERROR", { error: err });
  650. console.log(
  651. "ERROR",
  652. "STATIONS_UPDATE_LOCKED_STATUS",
  653. `Toggling the queue lock for station "${stationId}" failed. "${err}"`
  654. );
  655. return cb({ status: "failure", message: err });
  656. }
  657. console.log(
  658. "SUCCESS",
  659. "STATIONS_UPDATE_LOCKED_STATUS",
  660. `Toggled the queue lock for station "${stationId}" successfully to "${station.locked}".`
  661. );
  662. cache.runJob("PUB", {
  663. channel: "station.queueLockToggled",
  664. value: {
  665. stationId,
  666. locked: station.locked
  667. }
  668. });
  669. return cb({ status: "success", data: station.locked });
  670. }
  671. );
  672. }),
  673. /**
  674. * Votes to skip a station
  675. *
  676. * @param session
  677. * @param stationId - the station id
  678. * @param cb
  679. */
  680. voteSkip: isLoginRequired(async (session, stationId, cb) => {
  681. const stationModel = await db.runJob("GET_MODEL", {
  682. modelName: "station"
  683. });
  684. let skipVotes = 0;
  685. let shouldSkip = false;
  686. async.waterfall(
  687. [
  688. next => {
  689. stations
  690. .runJob("GET_STATION", { stationId })
  691. .then(station => {
  692. next(null, station);
  693. })
  694. .catch(next);
  695. },
  696. (station, next) => {
  697. if (!station) return next("Station not found.");
  698. return stations
  699. .runJob("CAN_USER_VIEW_STATION", {
  700. station,
  701. userId: session.userId
  702. })
  703. .then(canView => {
  704. if (canView) return next(null, station);
  705. return next("Insufficient permissions.");
  706. })
  707. .catch(err => next(err));
  708. },
  709. (station, next) => {
  710. if (!station.currentSong) return next("There is currently no song to skip.");
  711. if (station.currentSong.skipVotes.indexOf(session.userId) !== -1)
  712. return next("You have already voted to skip this song.");
  713. return next(null, station);
  714. },
  715. (station, next) => {
  716. stationModel.updateOne(
  717. { _id: stationId },
  718. { $push: { "currentSong.skipVotes": session.userId } },
  719. next
  720. );
  721. },
  722. (res, next) => {
  723. stations
  724. .runJob("UPDATE_STATION", { stationId })
  725. .then(station => {
  726. next(null, station);
  727. })
  728. .catch(next);
  729. },
  730. (station, next) => {
  731. if (!station) return next("Station not found.");
  732. return next(null, station);
  733. },
  734. (station, next) => {
  735. skipVotes = station.currentSong.skipVotes.length;
  736. utils
  737. .runJob("GET_ROOM_SOCKETS", {
  738. room: `station.${stationId}`
  739. })
  740. .then(sockets => {
  741. next(null, sockets);
  742. })
  743. .catch(next);
  744. },
  745. (sockets, next) => {
  746. if (sockets.length <= skipVotes) shouldSkip = true;
  747. next();
  748. }
  749. ],
  750. async err => {
  751. if (err) {
  752. err = await utils.runJob("GET_ERROR", { error: err });
  753. console.log("ERROR", "STATIONS_VOTE_SKIP", `Vote skipping station "${stationId}" failed. "${err}"`);
  754. return cb({ status: "failure", message: err });
  755. }
  756. console.log("SUCCESS", "STATIONS_VOTE_SKIP", `Vote skipping "${stationId}" successful.`);
  757. cache.runJob("PUB", {
  758. channel: "station.voteSkipSong",
  759. value: stationId
  760. });
  761. if (shouldSkip) stations.runJob("SKIP_STATION", { stationId });
  762. return cb({
  763. status: "success",
  764. message: "Successfully voted to skip the song."
  765. });
  766. }
  767. );
  768. }),
  769. /**
  770. * Force skips a station
  771. *
  772. * @param session
  773. * @param stationId - the station id
  774. * @param cb
  775. */
  776. forceSkip: isOwnerRequired((session, stationId, cb) => {
  777. async.waterfall(
  778. [
  779. next => {
  780. stations
  781. .runJob("GET_STATION", { stationId })
  782. .then(station => {
  783. next(null, station);
  784. })
  785. .catch(next);
  786. },
  787. (station, next) => {
  788. if (!station) return next("Station not found.");
  789. return next();
  790. }
  791. ],
  792. async err => {
  793. if (err) {
  794. err = await utils.runJob("GET_ERROR", { error: err });
  795. console.log(
  796. "ERROR",
  797. "STATIONS_FORCE_SKIP",
  798. `Force skipping station "${stationId}" failed. "${err}"`
  799. );
  800. return cb({ status: "failure", message: err });
  801. }
  802. notifications.runJob("UNSCHEDULE", {
  803. name: `stations.nextSong?id=${stationId}`
  804. });
  805. stations.runJob("SKIP_STATION", { stationId });
  806. console.log("SUCCESS", "STATIONS_FORCE_SKIP", `Force skipped station "${stationId}" successfully.`);
  807. return cb({
  808. status: "success",
  809. message: "Successfully skipped station."
  810. });
  811. }
  812. );
  813. }),
  814. /**
  815. * Leaves the user's current station
  816. *
  817. * @param {object} session - user session
  818. * @param {string} stationId - id of station to leave
  819. * @param {Function} cb - callback
  820. */
  821. leave: (session, stationId, cb) => {
  822. async.waterfall(
  823. [
  824. next => {
  825. stations
  826. .runJob("GET_STATION", { stationId })
  827. .then(station => {
  828. next(null, station);
  829. })
  830. .catch(next);
  831. },
  832. (station, next) => {
  833. if (!station) return next("Station not found.");
  834. return next();
  835. }
  836. ],
  837. async (err, userCount) => {
  838. if (err) {
  839. err = await utils.runJob("GET_ERROR", { error: err });
  840. console.log("ERROR", "STATIONS_LEAVE", `Leaving station "${stationId}" failed. "${err}"`);
  841. return cb({ status: "failure", message: err });
  842. }
  843. console.log("SUCCESS", "STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
  844. utils.runJob("SOCKET_LEAVE_ROOMS", { socketId: session });
  845. delete userList[session.socketId];
  846. return cb({
  847. status: "success",
  848. message: "Successfully left station.",
  849. userCount
  850. });
  851. }
  852. );
  853. },
  854. /**
  855. * Updates a station's name
  856. *
  857. * @param session
  858. * @param stationId - the station id
  859. * @param newName - the new station name
  860. * @param cb
  861. */
  862. updateName: isOwnerRequired(async (session, stationId, newName, cb) => {
  863. const stationModel = await db.runJob("GET_MODEL", {
  864. modelName: "station"
  865. });
  866. async.waterfall(
  867. [
  868. next => {
  869. stationModel.updateOne(
  870. { _id: stationId },
  871. { $set: { name: newName } },
  872. { runValidators: true },
  873. next
  874. );
  875. },
  876. (res, next) => {
  877. stations
  878. .runJob("UPDATE_STATION", { stationId })
  879. .then(station => {
  880. next(null, station);
  881. })
  882. .catch(next);
  883. }
  884. ],
  885. async err => {
  886. if (err) {
  887. err = await utils.runJob("GET_ERROR", { error: err });
  888. console.log(
  889. "ERROR",
  890. "STATIONS_UPDATE_NAME",
  891. `Updating station "${stationId}" name to "${newName}" failed. "${err}"`
  892. );
  893. return cb({ status: "failure", message: err });
  894. }
  895. console.log(
  896. "SUCCESS",
  897. "STATIONS_UPDATE_NAME",
  898. `Updated station "${stationId}" name to "${newName}" successfully.`
  899. );
  900. return cb({
  901. status: "success",
  902. message: "Successfully updated the name."
  903. });
  904. }
  905. );
  906. }),
  907. /**
  908. * Updates a station's display name
  909. *
  910. * @param session
  911. * @param stationId - the station id
  912. * @param newDisplayName - the new station display name
  913. * @param cb
  914. */
  915. updateDisplayName: isOwnerRequired(async (session, stationId, newDisplayName, cb) => {
  916. const stationModel = await db.runJob("GET_MODEL", {
  917. modelName: "station"
  918. });
  919. async.waterfall(
  920. [
  921. next => {
  922. stationModel.updateOne(
  923. { _id: stationId },
  924. { $set: { displayName: newDisplayName } },
  925. { runValidators: true },
  926. next
  927. );
  928. },
  929. (res, next) => {
  930. stations
  931. .runJob("UPDATE_STATION", { stationId })
  932. .then(station => {
  933. next(null, station);
  934. })
  935. .catch(next);
  936. }
  937. ],
  938. async err => {
  939. if (err) {
  940. err = await utils.runJob("GET_ERROR", { error: err });
  941. console.log(
  942. "ERROR",
  943. "STATIONS_UPDATE_DISPLAY_NAME",
  944. `Updating station "${stationId}" displayName to "${newDisplayName}" failed. "${err}"`
  945. );
  946. return cb({ status: "failure", message: err });
  947. }
  948. console.log(
  949. "SUCCESS",
  950. "STATIONS_UPDATE_DISPLAY_NAME",
  951. `Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`
  952. );
  953. return cb({
  954. status: "success",
  955. message: "Successfully updated the display name."
  956. });
  957. }
  958. );
  959. }),
  960. /**
  961. * Updates a station's description
  962. *
  963. * @param session
  964. * @param stationId - the station id
  965. * @param newDescription - the new station description
  966. * @param cb
  967. */
  968. updateDescription: isOwnerRequired(async (session, stationId, newDescription, cb) => {
  969. const stationModel = await db.runJob("GET_MODEL", {
  970. modelName: "station"
  971. });
  972. async.waterfall(
  973. [
  974. next => {
  975. stationModel.updateOne(
  976. { _id: stationId },
  977. { $set: { description: newDescription } },
  978. { runValidators: true },
  979. next
  980. );
  981. },
  982. (res, next) => {
  983. stations
  984. .runJob("UPDATE_STATION", { stationId })
  985. .then(station => {
  986. next(null, station);
  987. })
  988. .catch(next);
  989. }
  990. ],
  991. async err => {
  992. if (err) {
  993. err = await utils.runJob("GET_ERROR", { error: err });
  994. console.log(
  995. "ERROR",
  996. "STATIONS_UPDATE_DESCRIPTION",
  997. `Updating station "${stationId}" description to "${newDescription}" failed. "${err}"`
  998. );
  999. return cb({ status: "failure", message: err });
  1000. }
  1001. console.log(
  1002. "SUCCESS",
  1003. "STATIONS_UPDATE_DESCRIPTION",
  1004. `Updated station "${stationId}" description to "${newDescription}" successfully.`
  1005. );
  1006. return cb({
  1007. status: "success",
  1008. message: "Successfully updated the description."
  1009. });
  1010. }
  1011. );
  1012. }),
  1013. /**
  1014. * Updates a station's privacy
  1015. *
  1016. * @param session
  1017. * @param stationId - the station id
  1018. * @param newPrivacy - the new station privacy
  1019. * @param cb
  1020. */
  1021. updatePrivacy: isOwnerRequired(async (session, stationId, newPrivacy, cb) => {
  1022. const stationModel = await db.runJob("GET_MODEL", {
  1023. modelName: "station"
  1024. });
  1025. async.waterfall(
  1026. [
  1027. next => {
  1028. stationModel.updateOne(
  1029. { _id: stationId },
  1030. { $set: { privacy: newPrivacy } },
  1031. { runValidators: true },
  1032. next
  1033. );
  1034. },
  1035. (res, next) => {
  1036. stations
  1037. .runJob("UPDATE_STATION", { stationId })
  1038. .then(station => {
  1039. next(null, station);
  1040. })
  1041. .catch(next);
  1042. }
  1043. ],
  1044. async err => {
  1045. if (err) {
  1046. err = await utils.runJob("GET_ERROR", { error: err });
  1047. console.log(
  1048. "ERROR",
  1049. "STATIONS_UPDATE_PRIVACY",
  1050. `Updating station "${stationId}" privacy to "${newPrivacy}" failed. "${err}"`
  1051. );
  1052. return cb({ status: "failure", message: err });
  1053. }
  1054. console.log(
  1055. "SUCCESS",
  1056. "STATIONS_UPDATE_PRIVACY",
  1057. `Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`
  1058. );
  1059. return cb({
  1060. status: "success",
  1061. message: "Successfully updated the privacy."
  1062. });
  1063. }
  1064. );
  1065. }),
  1066. /**
  1067. * Updates a station's genres
  1068. *
  1069. * @param session
  1070. * @param stationId - the station id
  1071. * @param newGenres - the new station genres
  1072. * @param cb
  1073. */
  1074. updateGenres: isOwnerRequired(async (session, stationId, newGenres, cb) => {
  1075. const stationModel = await db.runJob("GET_MODEL", {
  1076. modelName: "station"
  1077. });
  1078. async.waterfall(
  1079. [
  1080. next => {
  1081. stationModel.updateOne(
  1082. { _id: stationId },
  1083. { $set: { genres: newGenres } },
  1084. { runValidators: true },
  1085. next
  1086. );
  1087. },
  1088. (res, next) => {
  1089. stations
  1090. .runJob("UPDATE_STATION", { stationId })
  1091. .then(station => {
  1092. next(null, station);
  1093. })
  1094. .catch(next);
  1095. }
  1096. ],
  1097. async err => {
  1098. if (err) {
  1099. err = await utils.runJob("GET_ERROR", { error: err });
  1100. console.log(
  1101. "ERROR",
  1102. "STATIONS_UPDATE_GENRES",
  1103. `Updating station "${stationId}" genres to "${newGenres}" failed. "${err}"`
  1104. );
  1105. return cb({ status: "failure", message: err });
  1106. }
  1107. console.log(
  1108. "SUCCESS",
  1109. "STATIONS_UPDATE_GENRES",
  1110. `Updated station "${stationId}" genres to "${newGenres}" successfully.`
  1111. );
  1112. return cb({
  1113. status: "success",
  1114. message: "Successfully updated the genres."
  1115. });
  1116. }
  1117. );
  1118. }),
  1119. /**
  1120. * Updates a station's blacklisted genres
  1121. *
  1122. * @param session
  1123. * @param stationId - the station id
  1124. * @param newBlacklistedGenres - the new station blacklisted genres
  1125. * @param cb
  1126. */
  1127. updateBlacklistedGenres: isOwnerRequired(async (session, stationId, newBlacklistedGenres, cb) => {
  1128. const stationModel = await db.runJob("GET_MODEL", {
  1129. modelName: "station"
  1130. });
  1131. async.waterfall(
  1132. [
  1133. next => {
  1134. stationModel.updateOne(
  1135. { _id: stationId },
  1136. {
  1137. $set: {
  1138. blacklistedGenres: newBlacklistedGenres
  1139. }
  1140. },
  1141. { runValidators: true },
  1142. next
  1143. );
  1144. },
  1145. (res, next) => {
  1146. stations
  1147. .runJob("UPDATE_STATION", { stationId })
  1148. .then(station => {
  1149. next(null, station);
  1150. })
  1151. .catch(next);
  1152. }
  1153. ],
  1154. async err => {
  1155. if (err) {
  1156. err = await utils.runJob("GET_ERROR", { error: err });
  1157. console.log(
  1158. "ERROR",
  1159. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1160. `Updating station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" failed. "${err}"`
  1161. );
  1162. return cb({ status: "failure", message: err });
  1163. }
  1164. console.log(
  1165. "SUCCESS",
  1166. "STATIONS_UPDATE_BLACKLISTED_GENRES",
  1167. `Updated station "${stationId}" blacklisted genres to "${newBlacklistedGenres}" successfully.`
  1168. );
  1169. return cb({
  1170. status: "success",
  1171. message: "Successfully updated the blacklisted genres."
  1172. });
  1173. }
  1174. );
  1175. }),
  1176. /**
  1177. * Updates a station's party mode
  1178. *
  1179. * @param session
  1180. * @param stationId - the station id
  1181. * @param newPartyMode - the new station party mode
  1182. * @param cb
  1183. */
  1184. updatePartyMode: isOwnerRequired(async (session, stationId, newPartyMode, cb) => {
  1185. const stationModel = await db.runJob("GET_MODEL", {
  1186. modelName: "station"
  1187. });
  1188. async.waterfall(
  1189. [
  1190. next => {
  1191. stations
  1192. .runJob("GET_STATION", { stationId })
  1193. .then(station => {
  1194. next(null, station);
  1195. })
  1196. .catch(next);
  1197. },
  1198. (station, next) => {
  1199. if (!station) return next("Station not found.");
  1200. if (station.partyMode === newPartyMode)
  1201. return next(`The party mode was already ${newPartyMode ? "enabled." : "disabled."}`);
  1202. return stationModel.updateOne(
  1203. { _id: stationId },
  1204. { $set: { partyMode: newPartyMode } },
  1205. { runValidators: true },
  1206. next
  1207. );
  1208. },
  1209. (res, next) => {
  1210. stations
  1211. .runJob("UPDATE_STATION", { stationId })
  1212. .then(station => {
  1213. next(null, station);
  1214. })
  1215. .catch(next);
  1216. }
  1217. ],
  1218. async err => {
  1219. if (err) {
  1220. err = await utils.runJob("GET_ERROR", { error: err });
  1221. console.log(
  1222. "ERROR",
  1223. "STATIONS_UPDATE_PARTY_MODE",
  1224. `Updating station "${stationId}" party mode to "${newPartyMode}" failed. "${err}"`
  1225. );
  1226. return cb({ status: "failure", message: err });
  1227. }
  1228. console.log(
  1229. "SUCCESS",
  1230. "STATIONS_UPDATE_PARTY_MODE",
  1231. `Updated station "${stationId}" party mode to "${newPartyMode}" successfully.`
  1232. );
  1233. cache.runJob("PUB", {
  1234. channel: "station.updatePartyMode",
  1235. value: {
  1236. stationId,
  1237. partyMode: newPartyMode
  1238. }
  1239. });
  1240. stations.runJob("SKIP_STATION", { stationId });
  1241. return cb({
  1242. status: "success",
  1243. message: "Successfully updated the party mode."
  1244. });
  1245. }
  1246. );
  1247. }),
  1248. /**
  1249. * Pauses a station
  1250. *
  1251. * @param session
  1252. * @param stationId - the station id
  1253. * @param cb
  1254. */
  1255. pause: isOwnerRequired(async (session, stationId, cb) => {
  1256. const stationModel = await db.runJob("GET_MODEL", {
  1257. modelName: "station"
  1258. });
  1259. async.waterfall(
  1260. [
  1261. next => {
  1262. stations
  1263. .runJob("GET_STATION", { stationId })
  1264. .then(station => {
  1265. next(null, station);
  1266. })
  1267. .catch(next);
  1268. },
  1269. (station, next) => {
  1270. if (!station) return next("Station not found.");
  1271. if (station.paused) return next("That station was already paused.");
  1272. return stationModel.updateOne(
  1273. { _id: stationId },
  1274. { $set: { paused: true, pausedAt: Date.now() } },
  1275. next
  1276. );
  1277. },
  1278. (res, next) => {
  1279. stations
  1280. .runJob("UPDATE_STATION", { stationId })
  1281. .then(station => {
  1282. next(null, station);
  1283. })
  1284. .catch(next);
  1285. }
  1286. ],
  1287. async err => {
  1288. if (err) {
  1289. err = await utils.runJob("GET_ERROR", { error: err });
  1290. console.log("ERROR", "STATIONS_PAUSE", `Pausing station "${stationId}" failed. "${err}"`);
  1291. return cb({ status: "failure", message: err });
  1292. }
  1293. console.log("SUCCESS", "STATIONS_PAUSE", `Paused station "${stationId}" successfully.`);
  1294. cache.runJob("PUB", {
  1295. channel: "station.pause",
  1296. value: stationId
  1297. });
  1298. notifications.runJob("UNSCHEDULE", {
  1299. name: `stations.nextSong?id=${stationId}`
  1300. });
  1301. return cb({
  1302. status: "success",
  1303. message: "Successfully paused."
  1304. });
  1305. }
  1306. );
  1307. }),
  1308. /**
  1309. * Resumes a station
  1310. *
  1311. * @param session
  1312. * @param stationId - the station id
  1313. * @param cb
  1314. */
  1315. resume: isOwnerRequired(async (session, stationId, cb) => {
  1316. const stationModel = await db.runJob("GET_MODEL", {
  1317. modelName: "station"
  1318. });
  1319. async.waterfall(
  1320. [
  1321. next => {
  1322. stations
  1323. .runJob("GET_STATION", { stationId })
  1324. .then(station => {
  1325. next(null, station);
  1326. })
  1327. .catch(next);
  1328. },
  1329. (station, next) => {
  1330. if (!station) return next("Station not found.");
  1331. if (!station.paused) return next("That station is not paused.");
  1332. station.timePaused += Date.now() - station.pausedAt;
  1333. return stationModel.updateOne(
  1334. { _id: stationId },
  1335. {
  1336. $set: { paused: false },
  1337. $inc: { timePaused: Date.now() - station.pausedAt }
  1338. },
  1339. next
  1340. );
  1341. },
  1342. (res, next) => {
  1343. stations
  1344. .runJob("UPDATE_STATION", { stationId })
  1345. .then(station => {
  1346. next(null, station);
  1347. })
  1348. .catch(next);
  1349. }
  1350. ],
  1351. async err => {
  1352. if (err) {
  1353. err = await utils.runJob("GET_ERROR", { error: err });
  1354. console.log("ERROR", "STATIONS_RESUME", `Resuming station "${stationId}" failed. "${err}"`);
  1355. return cb({ status: "failure", message: err });
  1356. }
  1357. console.log("SUCCESS", "STATIONS_RESUME", `Resuming station "${stationId}" successfully.`);
  1358. cache.runJob("PUB", {
  1359. channel: "station.resume",
  1360. value: stationId
  1361. });
  1362. return cb({
  1363. status: "success",
  1364. message: "Successfully resumed."
  1365. });
  1366. }
  1367. );
  1368. }),
  1369. /**
  1370. * Removes a station
  1371. *
  1372. * @param session
  1373. * @param stationId - the station id
  1374. * @param cb
  1375. */
  1376. remove: isOwnerRequired(async (session, stationId, cb) => {
  1377. const stationModel = await db.runJob("GET_MODEL", {
  1378. modelName: "station"
  1379. });
  1380. async.waterfall(
  1381. [
  1382. next => {
  1383. stationModel.deleteOne({ _id: stationId }, err => next(err));
  1384. },
  1385. next => {
  1386. cache.runJob("HDEL", { table: "stations", key: stationId }).then(next).catch(next);
  1387. }
  1388. ],
  1389. async err => {
  1390. if (err) {
  1391. err = await utils.runJob("GET_ERROR", { error: err });
  1392. console.log("ERROR", "STATIONS_REMOVE", `Removing station "${stationId}" failed. "${err}"`);
  1393. return cb({ status: "failure", message: err });
  1394. }
  1395. console.log("SUCCESS", "STATIONS_REMOVE", `Removing station "${stationId}" successfully.`);
  1396. cache.runJob("PUB", {
  1397. channel: "station.remove",
  1398. value: stationId
  1399. });
  1400. activities.runJob("ADD_ACTIVITY", {
  1401. userId: session.userId,
  1402. activityType: "deleted_station",
  1403. payload: [stationId]
  1404. });
  1405. return cb({
  1406. status: "success",
  1407. message: "Successfully removed."
  1408. });
  1409. }
  1410. );
  1411. }),
  1412. /**
  1413. * Create a station
  1414. *
  1415. * @param session
  1416. * @param data - the station data
  1417. * @param cb
  1418. */
  1419. create: isLoginRequired(async (session, data, cb) => {
  1420. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1421. const stationModel = await db.runJob("GET_MODEL", {
  1422. modelName: "station"
  1423. });
  1424. data.name = data.name.toLowerCase();
  1425. const blacklist = [
  1426. "country",
  1427. "edm",
  1428. "musare",
  1429. "hip-hop",
  1430. "rap",
  1431. "top-hits",
  1432. "todays-hits",
  1433. "old-school",
  1434. "christmas",
  1435. "about",
  1436. "support",
  1437. "staff",
  1438. "help",
  1439. "news",
  1440. "terms",
  1441. "privacy",
  1442. "profile",
  1443. "c",
  1444. "community",
  1445. "tos",
  1446. "login",
  1447. "register",
  1448. "p",
  1449. "official",
  1450. "o",
  1451. "trap",
  1452. "faq",
  1453. "team",
  1454. "donate",
  1455. "buy",
  1456. "shop",
  1457. "forums",
  1458. "explore",
  1459. "settings",
  1460. "admin",
  1461. "auth",
  1462. "reset_password"
  1463. ];
  1464. async.waterfall(
  1465. [
  1466. next => {
  1467. if (!data) return next("Invalid data.");
  1468. return next();
  1469. },
  1470. next => {
  1471. stationModel.findOne(
  1472. {
  1473. $or: [
  1474. { name: data.name },
  1475. {
  1476. displayName: new RegExp(`^${data.displayName}$`, "i")
  1477. }
  1478. ]
  1479. },
  1480. next
  1481. );
  1482. },
  1483. // eslint-disable-next-line consistent-return
  1484. (station, next) => {
  1485. console.log(station);
  1486. if (station) return next("A station with that name or display name already exists.");
  1487. const { name, displayName, description, genres, playlist, type, blacklistedGenres } = data;
  1488. if (type === "official") {
  1489. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1490. if (err) return next(err);
  1491. if (!user) return next("User not found.");
  1492. if (user.role !== "admin") return next("Admin required.");
  1493. return stationModel.create(
  1494. {
  1495. name,
  1496. displayName,
  1497. description,
  1498. type,
  1499. privacy: "private",
  1500. playlist,
  1501. genres,
  1502. blacklistedGenres,
  1503. currentSong: stations.defaultSong
  1504. },
  1505. next
  1506. );
  1507. });
  1508. }
  1509. if (type === "community") {
  1510. if (blacklist.indexOf(name) !== -1)
  1511. return next("That name is blacklisted. Please use a different name.");
  1512. return stationModel.create(
  1513. {
  1514. name,
  1515. displayName,
  1516. description,
  1517. type,
  1518. privacy: "private",
  1519. owner: session.userId,
  1520. queue: [],
  1521. currentSong: null
  1522. },
  1523. next
  1524. );
  1525. }
  1526. }
  1527. ],
  1528. async (err, station) => {
  1529. if (err) {
  1530. err = await utils.runJob("GET_ERROR", { error: err });
  1531. console.log("ERROR", "STATIONS_CREATE", `Creating station failed. "${err}"`);
  1532. return cb({ status: "failure", message: err });
  1533. }
  1534. console.log("SUCCESS", "STATIONS_CREATE", `Created station "${station._id}" successfully.`);
  1535. cache.runJob("PUB", {
  1536. channel: "station.create",
  1537. value: station._id
  1538. });
  1539. activities.runJob("ADD_ACTIVITY", {
  1540. userId: session.userId,
  1541. activityType: "created_station",
  1542. payload: [station._id]
  1543. });
  1544. return cb({
  1545. status: "success",
  1546. message: "Successfully created station."
  1547. });
  1548. }
  1549. );
  1550. }),
  1551. /**
  1552. * Adds song to station queue
  1553. *
  1554. * @param session
  1555. * @param stationId - the station id
  1556. * @param songId - the song id
  1557. * @param cb
  1558. */
  1559. addToQueue: isLoginRequired(async (session, stationId, songId, cb) => {
  1560. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1561. const stationModel = await db.runJob("GET_MODEL", {
  1562. modelName: "station"
  1563. });
  1564. async.waterfall(
  1565. [
  1566. next => {
  1567. stations
  1568. .runJob("GET_STATION", { stationId })
  1569. .then(station => {
  1570. next(null, station);
  1571. })
  1572. .catch(next);
  1573. },
  1574. (station, next) => {
  1575. if (!station) return next("Station not found.");
  1576. if (station.locked) {
  1577. return userModel.findOne({ _id: session.userId }, (err, user) => {
  1578. if (user.role !== "admin" && station.owner !== session.userId)
  1579. return next("Only owners and admins can add songs to a locked queue.");
  1580. return next(null, station);
  1581. });
  1582. }
  1583. return next(null, station);
  1584. },
  1585. (station, next) => {
  1586. if (station.type !== "community") return next("That station is not a community station.");
  1587. return stations
  1588. .runJob("CAN_USER_VIEW_STATION", {
  1589. station,
  1590. userId: session.userId
  1591. })
  1592. .then(canView => {
  1593. if (canView) return next(null, station);
  1594. return next("Insufficient permissions.");
  1595. })
  1596. .catch(err => next(err));
  1597. },
  1598. (station, next) => {
  1599. if (station.currentSong && station.currentSong.songId === songId)
  1600. return next("That song is currently playing.");
  1601. return async.each(
  1602. station.queue,
  1603. (queueSong, next) => {
  1604. if (queueSong.songId === songId) return next("That song is already in the queue.");
  1605. return next();
  1606. },
  1607. err => next(err, station)
  1608. );
  1609. },
  1610. (station, next) => {
  1611. songs
  1612. .runJob("GET_SONG_FROM_ID", { songId })
  1613. .then(res => {
  1614. if (res.song) return next(null, res.song, station);
  1615. return YouTubeModule.runJob("GET_SONG", { songId })
  1616. .then(response => {
  1617. const { song } = response;
  1618. song.artists = [];
  1619. song.skipDuration = 0;
  1620. song.likes = -1;
  1621. song.dislikes = -1;
  1622. song.thumbnail = "empty";
  1623. song.explicit = false;
  1624. return next(null, song, station);
  1625. })
  1626. .catch(err => next(err));
  1627. })
  1628. .catch(err => next(err));
  1629. },
  1630. (song, station, next) => {
  1631. const { queue } = station;
  1632. song.requestedBy = session.userId;
  1633. queue.push(song);
  1634. let totalDuration = 0;
  1635. queue.forEach(song => {
  1636. totalDuration += song.duration;
  1637. });
  1638. if (totalDuration >= 3600 * 3) return next("The max length of the queue is 3 hours.");
  1639. return next(null, song, station);
  1640. },
  1641. (song, station, next) => {
  1642. const { queue } = station;
  1643. if (queue.length === 0) return next(null, song, station);
  1644. let totalDuration = 0;
  1645. const userId = queue[queue.length - 1].requestedBy;
  1646. station.queue.forEach(song => {
  1647. if (userId === song.requestedBy) {
  1648. totalDuration += song.duration;
  1649. }
  1650. });
  1651. if (totalDuration >= 900) return next("The max length of songs per user is 15 minutes.");
  1652. return next(null, song, station);
  1653. },
  1654. (song, station, next) => {
  1655. const { queue } = station;
  1656. if (queue.length === 0) return next(null, song);
  1657. let totalSongs = 0;
  1658. const userId = queue[queue.length - 1].requestedBy;
  1659. queue.forEach(song => {
  1660. if (userId === song.requestedBy) {
  1661. totalSongs += 1;
  1662. }
  1663. });
  1664. if (totalSongs <= 2) return next(null, song);
  1665. if (totalSongs > 3)
  1666. return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1667. if (queue[queue.length - 2].requestedBy !== userId || queue[queue.length - 3] !== userId)
  1668. return next("The max amount of songs per user is 3, and only 2 in a row is allowed.");
  1669. return next(null, song);
  1670. },
  1671. (song, next) => {
  1672. stationModel.updateOne(
  1673. { _id: stationId },
  1674. { $push: { queue: song } },
  1675. { runValidators: true },
  1676. next
  1677. );
  1678. },
  1679. (res, next) => {
  1680. stations
  1681. .runJob("UPDATE_STATION", { stationId })
  1682. .then(station => {
  1683. next(null, station);
  1684. })
  1685. .catch(next);
  1686. }
  1687. ],
  1688. async err => {
  1689. if (err) {
  1690. err = await utils.runJob("GET_ERROR", { error: err });
  1691. console.log(
  1692. "ERROR",
  1693. "STATIONS_ADD_SONG_TO_QUEUE",
  1694. `Adding song "${songId}" to station "${stationId}" queue failed. "${err}"`
  1695. );
  1696. return cb({ status: "failure", message: err });
  1697. }
  1698. console.log(
  1699. "SUCCESS",
  1700. "STATIONS_ADD_SONG_TO_QUEUE",
  1701. `Added song "${songId}" to station "${stationId}" successfully.`
  1702. );
  1703. cache.runJob("PUB", {
  1704. channel: "station.queueUpdate",
  1705. value: stationId
  1706. });
  1707. return cb({
  1708. status: "success",
  1709. message: "Successfully added song to queue."
  1710. });
  1711. }
  1712. );
  1713. }),
  1714. /**
  1715. * Removes song from station queue
  1716. *
  1717. * @param session
  1718. * @param stationId - the station id
  1719. * @param songId - the song id
  1720. * @param cb
  1721. */
  1722. removeFromQueue: isOwnerRequired(async (session, stationId, songId, cb) => {
  1723. const stationModel = await db.runJob("GET_MODEL", {
  1724. modelName: "station"
  1725. });
  1726. async.waterfall(
  1727. [
  1728. next => {
  1729. if (!songId) return next("Invalid song id.");
  1730. return stations
  1731. .runJob("GET_STATION", { stationId })
  1732. .then(station => {
  1733. next(null, station);
  1734. })
  1735. .catch(next);
  1736. },
  1737. (station, next) => {
  1738. if (!station) return next("Station not found.");
  1739. if (station.type !== "community") return next("Station is not a community station.");
  1740. return async.each(
  1741. station.queue,
  1742. (queueSong, next) => {
  1743. if (queueSong.songId === songId) return next(true);
  1744. return next();
  1745. },
  1746. err => {
  1747. if (err === true) return next();
  1748. return next("Song is not currently in the queue.");
  1749. }
  1750. );
  1751. },
  1752. next => {
  1753. stationModel.updateOne({ _id: stationId }, { $pull: { queue: { songId } } }, next);
  1754. },
  1755. (res, next) => {
  1756. stations
  1757. .runJob("UPDATE_STATION", { stationId })
  1758. .then(station => {
  1759. next(null, station);
  1760. })
  1761. .catch(next);
  1762. }
  1763. ],
  1764. async err => {
  1765. if (err) {
  1766. err = await utils.runJob("GET_ERROR", { error: err });
  1767. console.log(
  1768. "ERROR",
  1769. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1770. `Removing song "${songId}" from station "${stationId}" queue failed. "${err}"`
  1771. );
  1772. return cb({ status: "failure", message: err });
  1773. }
  1774. console.log(
  1775. "SUCCESS",
  1776. "STATIONS_REMOVE_SONG_TO_QUEUE",
  1777. `Removed song "${songId}" from station "${stationId}" successfully.`
  1778. );
  1779. cache.runJob("PUB", {
  1780. channel: "station.queueUpdate",
  1781. value: stationId
  1782. });
  1783. return cb({
  1784. status: "success",
  1785. message: "Successfully removed song from queue."
  1786. });
  1787. }
  1788. );
  1789. }),
  1790. /**
  1791. * Gets the queue from a station
  1792. *
  1793. * @param {object} session - user session
  1794. * @param {string} stationId - the station id
  1795. * @param {Function} cb - callback
  1796. */
  1797. getQueue: (session, stationId, cb) => {
  1798. async.waterfall(
  1799. [
  1800. next => {
  1801. stations
  1802. .runJob("GET_STATION", { stationId })
  1803. .then(station => {
  1804. next(null, station);
  1805. })
  1806. .catch(next);
  1807. },
  1808. (station, next) => {
  1809. if (!station) return next("Station not found.");
  1810. if (station.type !== "community") return next("Station is not a community station.");
  1811. return next(null, station);
  1812. },
  1813. (station, next) => {
  1814. stations
  1815. .runJob("CAN_USER_VIEW_STATION", {
  1816. station,
  1817. userId: session.userId
  1818. })
  1819. .then(canView => {
  1820. if (canView) return next(null, station);
  1821. return next("Insufficient permissions.");
  1822. })
  1823. .catch(err => next(err));
  1824. }
  1825. ],
  1826. async (err, station) => {
  1827. if (err) {
  1828. err = await utils.runJob("GET_ERROR", { error: err });
  1829. console.log(
  1830. "ERROR",
  1831. "STATIONS_GET_QUEUE",
  1832. `Getting queue for station "${stationId}" failed. "${err}"`
  1833. );
  1834. return cb({ status: "failure", message: err });
  1835. }
  1836. console.log("SUCCESS", "STATIONS_GET_QUEUE", `Got queue for station "${stationId}" successfully.`);
  1837. return cb({
  1838. status: "success",
  1839. message: "Successfully got queue.",
  1840. queue: station.queue
  1841. });
  1842. }
  1843. );
  1844. },
  1845. /**
  1846. * Selects a private playlist for a station
  1847. *
  1848. * @param session
  1849. * @param stationId - the station id
  1850. * @param playlistId - the private playlist id
  1851. * @param cb
  1852. */
  1853. selectPrivatePlaylist: isOwnerRequired(async (session, stationId, playlistId, cb) => {
  1854. const stationModel = await db.runJob("GET_MODEL", {
  1855. modelName: "station"
  1856. });
  1857. const playlistModel = await db.runJob("GET_MODEL", {
  1858. modelName: "playlist"
  1859. });
  1860. async.waterfall(
  1861. [
  1862. next => {
  1863. stations
  1864. .runJob("GET_STATION", { stationId })
  1865. .then(station => {
  1866. next(null, station);
  1867. })
  1868. .catch(next);
  1869. },
  1870. (station, next) => {
  1871. if (!station) return next("Station not found.");
  1872. if (station.type !== "community") return next("Station is not a community station.");
  1873. if (station.privatePlaylist === playlistId)
  1874. return next("That private playlist is already selected.");
  1875. return playlistModel.findOne({ _id: playlistId }, next);
  1876. },
  1877. (playlist, next) => {
  1878. if (!playlist) return next("Playlist not found.");
  1879. const currentSongIndex = playlist.songs.length > 0 ? playlist.songs.length - 1 : 0;
  1880. return stationModel.updateOne(
  1881. { _id: stationId },
  1882. {
  1883. $set: {
  1884. privatePlaylist: playlistId,
  1885. currentSongIndex
  1886. }
  1887. },
  1888. { runValidators: true },
  1889. next
  1890. );
  1891. },
  1892. (res, next) => {
  1893. stations
  1894. .runJob("UPDATE_STATION", { stationId })
  1895. .then(station => {
  1896. next(null, station);
  1897. })
  1898. .catch(next);
  1899. }
  1900. ],
  1901. async (err, station) => {
  1902. if (err) {
  1903. err = await utils.runJob("GET_ERROR", { error: err });
  1904. console.log(
  1905. "ERROR",
  1906. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  1907. `Selecting private playlist "${playlistId}" for station "${stationId}" failed. "${err}"`
  1908. );
  1909. return cb({ status: "failure", message: err });
  1910. }
  1911. console.log(
  1912. "SUCCESS",
  1913. "STATIONS_SELECT_PRIVATE_PLAYLIST",
  1914. `Selected private playlist "${playlistId}" for station "${stationId}" successfully.`
  1915. );
  1916. notifications.runJob("UNSCHEDULE", {
  1917. name: `stations.nextSong?id${stationId}`
  1918. });
  1919. if (!station.partyMode) stations.runJob("SKIP_STATION", { stationId });
  1920. cache.runJob("PUB", {
  1921. channel: "privatePlaylist.selected",
  1922. value: {
  1923. playlistId,
  1924. stationId
  1925. }
  1926. });
  1927. return cb({
  1928. status: "success",
  1929. message: "Successfully selected playlist."
  1930. });
  1931. }
  1932. );
  1933. }),
  1934. favoriteStation: isLoginRequired(async (session, stationId, cb) => {
  1935. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1936. async.waterfall(
  1937. [
  1938. next => {
  1939. stations
  1940. .runJob("GET_STATION", { stationId })
  1941. .then(station => {
  1942. next(null, station);
  1943. })
  1944. .catch(next);
  1945. },
  1946. (station, next) => {
  1947. if (!station) return next("Station not found.");
  1948. return stations
  1949. .runJob("CAN_USER_VIEW_STATION", {
  1950. station,
  1951. userId: session.userId
  1952. })
  1953. .then(canView => {
  1954. if (canView) return next();
  1955. return next("Insufficient permissions.");
  1956. })
  1957. .catch(err => next(err));
  1958. },
  1959. next => {
  1960. userModel.updateOne({ _id: session.userId }, { $addToSet: { favoriteStations: stationId } }, next);
  1961. },
  1962. (res, next) => {
  1963. if (res.nModified === 0) return next("The station was already favorited.");
  1964. return next();
  1965. }
  1966. ],
  1967. async err => {
  1968. if (err) {
  1969. err = await utils.runJob("GET_ERROR", { error: err });
  1970. console.log("ERROR", "FAVORITE_STATION", `Favoriting station "${stationId}" failed. "${err}"`);
  1971. return cb({ status: "failure", message: err });
  1972. }
  1973. console.log("SUCCESS", "FAVORITE_STATION", `Favorited station "${stationId}" successfully.`);
  1974. cache.runJob("PUB", {
  1975. channel: "user.favoritedStation",
  1976. value: {
  1977. userId: session.userId,
  1978. stationId
  1979. }
  1980. });
  1981. return cb({
  1982. status: "success",
  1983. message: "Succesfully favorited station."
  1984. });
  1985. }
  1986. );
  1987. }),
  1988. unfavoriteStation: isLoginRequired(async (session, stationId, cb) => {
  1989. const userModel = await db.runJob("GET_MODEL", { modelName: "user" });
  1990. async.waterfall(
  1991. [
  1992. next => {
  1993. userModel.updateOne({ _id: session.userId }, { $pull: { favoriteStations: stationId } }, next);
  1994. },
  1995. (res, next) => {
  1996. if (res.nModified === 0) return next("The station wasn't favorited.");
  1997. return next();
  1998. }
  1999. ],
  2000. async err => {
  2001. if (err) {
  2002. err = await utils.runJob("GET_ERROR", { error: err });
  2003. console.log("ERROR", "UNFAVORITE_STATION", `Unfavoriting station "${stationId}" failed. "${err}"`);
  2004. return cb({ status: "failure", message: err });
  2005. }
  2006. console.log("SUCCESS", "UNFAVORITE_STATION", `Unfavorited station "${stationId}" successfully.`);
  2007. cache.runJob("PUB", {
  2008. channel: "user.unfavoritedStation",
  2009. value: {
  2010. userId: session.userId,
  2011. stationId
  2012. }
  2013. });
  2014. return cb({
  2015. status: "success",
  2016. message: "Succesfully unfavorited station."
  2017. });
  2018. }
  2019. );
  2020. })
  2021. };