stations.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. 'use strict';
  2. const cache = require('./cache');
  3. const db = require('./db');
  4. const io = require('./io');
  5. const utils = require('./utils');
  6. const logger = require('./logger');
  7. const songs = require('./songs');
  8. const notifications = require('./notifications');
  9. const async = require('async');
  10. let initialized = false;
  11. let lockdown = false;
  12. //TEMP
  13. cache.sub('station.pause', (stationId) => {
  14. if (lockdown) return;
  15. notifications.remove(`stations.nextSong?id=${stationId}`);
  16. });
  17. cache.sub('station.resume', (stationId) => {
  18. if (lockdown) return;
  19. module.exports.initializeStation(stationId)
  20. });
  21. cache.sub('station.queueUpdate', (stationId) => {
  22. if (lockdown) return;
  23. module.exports.getStation(stationId, (err, station) => {
  24. if (!station.currentSong && station.queue.length > 0) {
  25. module.exports.initializeStation(stationId);
  26. }
  27. });
  28. });
  29. cache.sub('station.newOfficialPlaylist', (stationId) => {
  30. if (lockdown) return;
  31. cache.hget("officialPlaylists", stationId, (err, playlistObj) => {
  32. if (!err && playlistObj) {
  33. utils.emitToRoom(`station.${stationId}`, "event:newOfficialPlaylist", playlistObj.songs);
  34. }
  35. })
  36. });
  37. module.exports = {
  38. init: function(cb) {
  39. async.waterfall([
  40. (next) => {
  41. cache.hgetall('stations', next);
  42. },
  43. (stations, next) => {
  44. if (!stations) return next();
  45. let stationIds = Object.keys(stations);
  46. async.each(stationIds, (stationId, next) => {
  47. db.models.station.findOne({_id: stationId}, (err, station) => {
  48. if (err) next(err);
  49. else if (!station) {
  50. cache.hdel('stations', stationId, next);
  51. } else next();
  52. });
  53. }, next);
  54. },
  55. (next) => {
  56. db.models.station.find({}, next);
  57. },
  58. (stations, next) => {
  59. async.each(stations, (station, next) => {
  60. async.waterfall([
  61. (next) => {
  62. cache.hset('stations', station._id, cache.schemas.station(station), next);
  63. },
  64. (station, next) => {
  65. this.initializeStation(station._id, next);
  66. }
  67. ], (err) => {
  68. next(err);
  69. });
  70. }, next);
  71. }
  72. ], (err) => {
  73. if (lockdown) return this._lockdown();
  74. if (err) {
  75. err = utils.getError(err);
  76. cb(err);
  77. } else {
  78. initialized = true;
  79. cb();
  80. }
  81. });
  82. },
  83. initializeStation: function(stationId, cb) {
  84. if (lockdown) return;
  85. if (typeof cb !== 'function') cb = ()=>{};
  86. let _this = this;
  87. async.waterfall([
  88. (next) => {
  89. _this.getStation(stationId, next);
  90. },
  91. (station, next) => {
  92. if (!station) return next('Station not found.');
  93. notifications.unschedule(`stations.nextSong?id=${station._id}`);
  94. notifications.subscribe(`stations.nextSong?id=${station._id}`, _this.skipStation(station._id), true, station);
  95. if (station.paused) return next(true, station);
  96. next(null, station);
  97. },
  98. (station, next) => {
  99. if (!station.currentSong) {
  100. return _this.skipStation(station._id)((err, station) => {
  101. if (err) return next(err);
  102. return next(true, station);
  103. });
  104. }
  105. let timeLeft = ((station.currentSong.duration * 1000) - (Date.now() - station.startedAt - station.timePaused));
  106. if (isNaN(timeLeft)) timeLeft = -1;
  107. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  108. this.skipStation(station._id)((err, station) => {
  109. next(err, station);
  110. });
  111. } else {
  112. notifications.schedule(`stations.nextSong?id=${station._id}`, timeLeft, null, station);
  113. next(null, station);
  114. }
  115. }
  116. ], (err, station) => {
  117. if (err && err !== true) return cb(err);
  118. cb(null, station);
  119. });
  120. },
  121. calculateSongForStation: function(station, cb) {
  122. if (lockdown) return;
  123. let _this = this;
  124. let songList = [];
  125. async.waterfall([
  126. (next) => {
  127. let genresDone = [];
  128. station.genres.forEach((genre) => {
  129. db.models.song.find({genres: genre}, (err, songs) => {
  130. if (!err) {
  131. songs.forEach((song) => {
  132. if (songList.indexOf(song._id) === -1) {
  133. let found = false;
  134. song.genres.forEach((songGenre) => {
  135. if (station.blacklistedGenres.indexOf(songGenre) !== -1) found = true;
  136. });
  137. if (!found) {
  138. songList.push(song._id);
  139. }
  140. }
  141. });
  142. }
  143. genresDone.push(genre);
  144. if (genresDone.length === station.genres.length) next();
  145. });
  146. });
  147. },
  148. (next) => {
  149. let playlist = [];
  150. songList.forEach(function(songId) {
  151. if(station.playlist.indexOf(songId) === -1) playlist.push(songId);
  152. });
  153. station.playlist.filter((songId) => {
  154. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  155. });
  156. playlist = utils.shuffle(playlist);
  157. _this.calculateOfficialPlaylistList(station._id, playlist, () => {
  158. next(null, playlist);
  159. });
  160. },
  161. (playlist, next) => {
  162. db.models.station.update({_id: station._id}, {$set: {playlist: playlist}}, {runValidators: true}, (err) => {
  163. _this.updateStation(station._id, () => {
  164. next(err, playlist);
  165. });
  166. });
  167. }
  168. ], (err, newPlaylist) => {
  169. cb(err, newPlaylist);
  170. });
  171. },
  172. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  173. getStation: function(stationId, cb) {
  174. if (lockdown) return;
  175. let _this = this;
  176. async.waterfall([
  177. (next) => {
  178. cache.hget('stations', stationId, next);
  179. },
  180. (station, next) => {
  181. if (station) return next(true, station);
  182. db.models.station.findOne({ _id: stationId }, next);
  183. },
  184. (station, next) => {
  185. if (station) {
  186. if (station.type === 'official') {
  187. _this.calculateOfficialPlaylistList(station._id, station.playlist, () => {});
  188. }
  189. station = cache.schemas.station(station);
  190. cache.hset('stations', stationId, station);
  191. next(true, station);
  192. } else next('Station not found');
  193. },
  194. ], (err, station) => {
  195. if (err && err !== true) return cb(err);
  196. cb(null, station);
  197. });
  198. },
  199. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  200. getStationByName: function(stationName, cb) {
  201. if (lockdown) return;
  202. let _this = this;
  203. async.waterfall([
  204. (next) => {
  205. db.models.station.findOne({ name: stationName }, next);
  206. },
  207. (station, next) => {
  208. if (station) {
  209. if (station.type === 'official') {
  210. _this.calculateOfficialPlaylistList(station._id, station.playlist, ()=>{});
  211. }
  212. station = cache.schemas.station(station);
  213. cache.hset('stations', station._id, station);
  214. next(true, station);
  215. } else next('Station not found');
  216. },
  217. ], (err, station) => {
  218. if (err && err !== true) return cb(err);
  219. cb(null, station);
  220. });
  221. },
  222. updateStation: function(stationId, cb) {
  223. if (lockdown) return;
  224. let _this = this;
  225. async.waterfall([
  226. (next) => {
  227. db.models.station.findOne({ _id: stationId }, next);
  228. },
  229. (station, next) => {
  230. if (!station) {
  231. cache.hdel('stations', stationId);
  232. return next('Station not found');
  233. }
  234. cache.hset('stations', stationId, station, next);
  235. }
  236. ], (err, station) => {
  237. if (err && err !== true) return cb(err);
  238. cb(null, station);
  239. });
  240. },
  241. calculateOfficialPlaylistList: (stationId, songList, cb) => {
  242. if (lockdown) return;
  243. let lessInfoPlaylist = [];
  244. async.each(songList, (song, next) => {
  245. songs.getSongFromId(song, (err, song) => {
  246. if (!err && song) {
  247. let newSong = {
  248. songId: song.songId,
  249. title: song.title,
  250. artists: song.artists,
  251. duration: song.duration
  252. };
  253. lessInfoPlaylist.push(newSong);
  254. }
  255. next();
  256. });
  257. }, () => {
  258. cache.hset("officialPlaylists", stationId, cache.schemas.officialPlaylist(stationId, lessInfoPlaylist), () => {
  259. cache.pub("station.newOfficialPlaylist", stationId);
  260. cb();
  261. });
  262. });
  263. },
  264. skipStation: function(stationId) {
  265. if (lockdown) return;
  266. logger.info("STATION_SKIP", `Skipping station ${stationId}.`, false);
  267. let _this = this;
  268. return (cb) => {
  269. if (lockdown) return;
  270. if (typeof cb !== 'function') cb = ()=>{};
  271. async.waterfall([
  272. (next) => {
  273. _this.getStation(stationId, next);
  274. },
  275. (station, next) => {
  276. if (!station) return next('Station not found.');
  277. if (station.type === 'community' && station.partyMode && station.queue.length === 0) return next(null, null, -11, station); // Community station with party mode enabled and no songs in the queue
  278. if (station.type === 'community' && station.partyMode && station.queue.length > 0) { // Community station with party mode enabled and songs in the queue
  279. return db.models.station.update({_id: stationId}, {$pull: {queue: {_id: station.queue[0]._id}}}, (err) => {
  280. if (err) return next(err);
  281. next(null, station.queue[0], -12, station);
  282. });
  283. }
  284. if (station.type === 'community' && !station.partyMode) {
  285. return db.models.playlist.findOne({_id: station.privatePlaylist}, (err, playlist) => {
  286. if (err) return next(err);
  287. if (!playlist) return next(null, null, -13, station);
  288. playlist = playlist.songs;
  289. if (playlist.length > 0) {
  290. let currentSongIndex;
  291. if (station.currentSongIndex < playlist.length - 1) currentSongIndex = station.currentSongIndex + 1;
  292. else currentSongIndex = 0;
  293. let callback = (err, song) => {
  294. if (err) return next(err);
  295. if (song) return next(null, song, currentSongIndex, station);
  296. else {
  297. let song = playlist[currentSongIndex];
  298. let currentSong = {
  299. songId: song.songId,
  300. title: song.title,
  301. duration: song.duration,
  302. likes: -1,
  303. dislikes: -1
  304. };
  305. return next(null, currentSong, currentSongIndex, station);
  306. }
  307. };
  308. if (playlist[currentSongIndex]._id) songs.getSong(playlist[currentSongIndex]._id, callback);
  309. else songs.getSongFromId(playlist[currentSongIndex].songId, callback);
  310. } else return next(null, null, -14, station);
  311. });
  312. }
  313. if (station.type === 'official' && station.playlist.length === 0) {
  314. return _this.calculateSongForStation(station, (err, playlist) => {
  315. if (err) return next(err);
  316. if (playlist.length === 0) return next(null, _this.defaultSong, 0, station);
  317. else {
  318. songs.getSong(playlist[0], (err, song) => {
  319. if (err || !song) return next(null, _this.defaultSong, 0, station);
  320. return next(null, song, 0, station);
  321. });
  322. }
  323. });
  324. }
  325. if (station.type === 'official' && station.playlist.length > 0) {
  326. async.doUntil((next) => {
  327. if (station.currentSongIndex < station.playlist.length - 1) {
  328. songs.getSong(station.playlist[station.currentSongIndex + 1], (err, song) => {
  329. if (!err) return next(null, song, station.currentSongIndex + 1);
  330. else {
  331. station.currentSongIndex++;
  332. next(null, null);
  333. }
  334. });
  335. } else {
  336. _this.calculateSongForStation(station, (err, newPlaylist) => {
  337. if (err) return next(null, _this.defaultSong, 0);
  338. songs.getSong(newPlaylist[0], (err, song) => {
  339. if (err || !song) return next(null, _this.defaultSong, 0);
  340. station.playlist = newPlaylist;
  341. next(null, song, 0);
  342. });
  343. });
  344. }
  345. }, (song) => {
  346. return !!song;
  347. }, (err, song, currentSongIndex) => {
  348. return next(err, song, currentSongIndex, station);
  349. });
  350. }
  351. },
  352. (song, currentSongIndex, station, next) => {
  353. let $set = {};
  354. if (song === null) $set.currentSong = null;
  355. else if (song.likes === -1 && song.dislikes === -1) {
  356. $set.currentSong = {
  357. songId: song.songId,
  358. title: song.title,
  359. duration: song.duration,
  360. skipDuration: 0,
  361. likes: -1,
  362. dislikes: -1
  363. };
  364. } else {
  365. $set.currentSong = {
  366. songId: song.songId,
  367. title: song.title,
  368. artists: song.artists,
  369. duration: song.duration,
  370. likes: song.likes,
  371. dislikes: song.dislikes,
  372. skipDuration: song.skipDuration,
  373. thumbnail: song.thumbnail
  374. };
  375. }
  376. if (currentSongIndex >= 0) $set.currentSongIndex = currentSongIndex;
  377. $set.startedAt = Date.now();
  378. $set.timePaused = 0;
  379. if (station.paused) $set.pausedAt = Date.now();
  380. next(null, $set, station);
  381. },
  382. ($set, station, next) => {
  383. db.models.station.update({_id: station._id}, {$set}, (err) => {
  384. _this.updateStation(station._id, (err, station) => {
  385. if (station.type === 'community' && station.partyMode === true)
  386. cache.pub('station.queueUpdate', stationId);
  387. next(null, station);
  388. });
  389. });
  390. },
  391. ], (err, station) => {
  392. if (!err) {
  393. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  394. station.currentSong.skipVotes = 0;
  395. }
  396. //TODO Pub/Sub this
  397. utils.emitToRoom(`station.${station._id}`, "event:songs.next", {
  398. currentSong: station.currentSong,
  399. startedAt: station.startedAt,
  400. paused: station.paused,
  401. timePaused: 0
  402. });
  403. if (station.privacy === 'public') utils.emitToRoom('home', "event:station.nextSong", station._id, station.currentSong);
  404. else {
  405. let sockets = utils.getRoomSockets('home');
  406. for (let socketId in sockets) {
  407. let socket = sockets[socketId];
  408. let session = sockets[socketId].session;
  409. if (session.sessionId) {
  410. cache.hget('sessions', session.sessionId, (err, session) => {
  411. if (!err && session) {
  412. db.models.user.findOne({_id: session.userId}, (err, user) => {
  413. if (!err && user) {
  414. if (user.role === 'admin') socket.emit("event:station.nextSong", station._id, station.currentSong);
  415. else if (station.type === "community" && station.owner === session.userId) socket.emit("event:station.nextSong", station._id, station.currentSong);
  416. }
  417. });
  418. }
  419. });
  420. }
  421. }
  422. }
  423. if (station.currentSong !== null && station.currentSong.songId !== undefined) {
  424. utils.socketsJoinSongRoom(utils.getRoomSockets(`station.${station._id}`), `song.${station.currentSong.songId}`);
  425. if (!station.paused) {
  426. notifications.schedule(`stations.nextSong?id=${station._id}`, station.currentSong.duration * 1000, null, station);
  427. }
  428. } else {
  429. utils.socketsLeaveSongRooms(utils.getRoomSockets(`station.${station._id}`));
  430. }
  431. cb(null, station);
  432. } else {
  433. err = utils.getError(err);
  434. logger.error('SKIP_STATION', `Skipping station "${stationId}" failed. "${err}"`);
  435. cb(err);
  436. }
  437. });
  438. }
  439. },
  440. defaultSong: {
  441. songId: '60ItHLz5WEA',
  442. title: 'Faded - Alan Walker',
  443. duration: 212,
  444. skipDuration: 0,
  445. likes: -1,
  446. dislikes: -1
  447. },
  448. _lockdown: () => {
  449. lockdown = true;
  450. }
  451. };