stations.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. 'use strict';
  2. const cache = require('./cache');
  3. const db = require('./db');
  4. const io = require('./io');
  5. const utils = require('./utils');
  6. const songs = require('./songs');
  7. const notifications = require('./notifications');
  8. const async = require('async');
  9. let skipTimeout = null;
  10. //TEMP
  11. cache.sub('station.pause', (stationId) => {
  12. notifications.remove(`stations.nextSong?id=${stationId}`);
  13. });
  14. cache.sub('station.resume', (stationId) => {
  15. module.exports.initializeStation(stationId)
  16. });
  17. cache.sub('station.queueUpdate', (stationId) => {
  18. module.exports.getStation(stationId, (err, station) => {
  19. if (!station.currentSong && station.queue.length > 0) {
  20. module.exports.initializeStation(stationId);
  21. }
  22. });
  23. });
  24. module.exports = {
  25. init: function(cb) {
  26. let _this = this;
  27. //TODO Add async waterfall
  28. db.models.station.find({}, (err, stations) => {
  29. if (!err) {
  30. stations.forEach((station) => {
  31. console.info("Initializing Station: " + station._id);
  32. _this.initializeStation(station._id);
  33. });
  34. cb();
  35. }
  36. });
  37. },
  38. initializeStation: function(stationId, cb) {
  39. console.log(112233, stationId, cb);
  40. if (typeof cb !== 'function') cb = ()=>{};
  41. let _this = this;
  42. _this.getStation(stationId, (err, station) => {
  43. if (!err) {
  44. console.log("###");
  45. if (station) {
  46. console.log("###1");
  47. let notification = notifications.subscribe(`stations.nextSong?id=${station._id}`, _this.skipStation(station._id), true);
  48. if (!station.paused ) {
  49. /*if (!station.startedAt) {
  50. station.startedAt = Date.now();
  51. station.timePaused = 0;
  52. cache.hset('stations', stationId, station);
  53. }*/
  54. if (station.currentSong) {
  55. let timeLeft = ((station.currentSong.duration * 1000) - (Date.now() - station.startedAt - station.timePaused));
  56. if (isNaN(timeLeft)) timeLeft = -1;
  57. if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
  58. console.log("Test");
  59. this.skipStation(station._id)((err, station) => {
  60. console.log(45, err, station);
  61. cb(err, station);
  62. });
  63. } else {
  64. notifications.schedule(`stations.nextSong?id=${station._id}`, timeLeft);
  65. cb(null, station);
  66. }
  67. } else {
  68. _this.skipStation(station._id)((err, station) => {
  69. console.log(47, err, station);
  70. cb(err, station);
  71. });
  72. }
  73. } else {
  74. notifications.unschedule(`stations.nextSong?id${station._id}`);
  75. cb(null, station);
  76. }
  77. } else cb("Station not found.");
  78. } else cb(err);
  79. });
  80. },
  81. calculateSongForStation: function(station, cb) {
  82. let _this = this;
  83. let songList = [];
  84. async.waterfall([
  85. (next) => {
  86. let genresDone = [];
  87. station.genres.forEach((genre) => {
  88. db.models.song.find({genres: genre}, (err, songs) => {
  89. if (!err) {
  90. songs.forEach((song) => {
  91. if (songList.indexOf(song._id) === -1) {
  92. let found = false;
  93. song.genres.forEach((songGenre) => {
  94. if (station.blacklistedGenres.indexOf(songGenre) !== -1) found = true;
  95. console.log(songGenre, station.blacklistedGenres, station.blacklistedGenres.indexOf(songGenre), found);
  96. });
  97. if (!found) {
  98. songList.push(song._id);
  99. }
  100. }
  101. });
  102. }
  103. genresDone.push(genre);
  104. if (genresDone.length === station.genres.length) {
  105. next();
  106. }
  107. });
  108. });
  109. },
  110. (next) => {
  111. let playlist = [];
  112. songList.forEach(function(songId) {
  113. if(station.playlist.indexOf(songId) === -1) playlist.push(songId);
  114. });
  115. station.playlist.filter((songId) => {
  116. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  117. });
  118. db.models.station.update({_id: station._id}, {$set: {playlist: playlist}}, (err) => {
  119. _this.updateStation(station._id, () => {
  120. next(err, playlist);
  121. });
  122. });
  123. }
  124. ], (err, newPlaylist) => {
  125. cb(err, newPlaylist);
  126. });
  127. },
  128. // Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
  129. getStation: function(stationId, cb) {
  130. async.waterfall([
  131. (next) => {
  132. cache.hget('stations', stationId, next);
  133. },
  134. (station, next) => {
  135. if (station) return next(true, station);
  136. db.models.station.findOne({ _id: stationId }, next);
  137. },
  138. (station, next) => {
  139. if (station) {
  140. station = cache.schemas.station(station);
  141. console.log(1234321, stationId);
  142. cache.hset('stations', stationId, station);
  143. next(true, station);
  144. } else next('Station not found.');
  145. },
  146. ], (err, station) => {
  147. if (err && err !== true) cb(err);
  148. cb(null, station);
  149. });
  150. },
  151. updateStation: (stationId, cb) => {
  152. async.waterfall([
  153. (next) => {
  154. db.models.station.findOne({ _id: stationId }, next);
  155. },
  156. (station, next) => {
  157. if (!station) return next('Station not found.');
  158. console.log(123444321, stationId);
  159. cache.hset('stations', stationId, station, (err) => {
  160. if (err) return next(err);
  161. next(null, station);
  162. });
  163. }
  164. ], (err, station) => {
  165. if (err && err !== true) cb(err);
  166. cb(null, station);
  167. });
  168. },
  169. skipStation: function(stationId) {
  170. let _this = this;
  171. return (cb) => {
  172. if (typeof cb !== 'function') cb = ()=>{};
  173. console.log("###2");
  174. console.log("NOTIFICATION!!!");
  175. _this.getStation(stationId, (err, station) => {
  176. console.log("###3");
  177. if (station) {
  178. console.log("###4");
  179. // notify all the sockets on this station to go to the next song
  180. async.waterfall([
  181. (next) => {
  182. console.log("###5");
  183. if (station.type === "official") {
  184. if (station.playlist.length > 0) {
  185. function func() {
  186. if (station.currentSongIndex < station.playlist.length - 1) {
  187. songs.getSong(station.playlist[station.currentSongIndex + 1], (err, song) => {
  188. if (!err) {
  189. let $set = {};
  190. $set.currentSong = {
  191. _id: song._id,
  192. title: song.title,
  193. artists: song.artists,
  194. duration: song.duration,
  195. likes: song.likes,
  196. dislikes: song.dislikes,
  197. skipDuration: song.skipDuration,
  198. thumbnail: song.thumbnail
  199. };
  200. $set.startedAt = Date.now();
  201. $set.timePaused = 0;
  202. $set.currentSongIndex = station.currentSongIndex + 1;
  203. next(null, $set);
  204. } else {
  205. db.models.station.update({_id: station._id}, {$inc: {currentSongIndex: 1}}, (err) => {
  206. _this.updateStation(station._id, () => {
  207. func();
  208. });
  209. });
  210. }
  211. });
  212. } else {
  213. db.models.station.update({_id: station._id}, {$set: {currentSongIndex: 0}}, (err) => {
  214. _this.updateStation(station._id, (err, station) => {
  215. console.log(12345678, err, station);
  216. _this.calculateSongForStation(station, (err, newPlaylist) => {
  217. console.log('New playlist: ', newPlaylist);
  218. if (!err) {
  219. songs.getSong(newPlaylist[0], (err, song) => {
  220. let $set = {};
  221. if (song) {
  222. $set.currentSong = {
  223. _id: song._id,
  224. title: song.title,
  225. artists: song.artists,
  226. duration: song.duration,
  227. likes: song.likes,
  228. dislikes: song.dislikes,
  229. skipDuration: song.skipDuration,
  230. thumbnail: song.thumbnail
  231. };
  232. station.playlist = newPlaylist;
  233. } else {
  234. $set.currentSong = _this.defaultSong;
  235. }
  236. $set.startedAt = Date.now();
  237. $set.timePaused = 0;
  238. next(null, $set);
  239. });
  240. } else {
  241. let $set = {};
  242. $set.currentSong = _this.defaultSong;
  243. $set.startedAt = Date.now();
  244. $set.timePaused = 0;
  245. next(null, $set);
  246. }
  247. })
  248. });
  249. });
  250. }
  251. }
  252. func();
  253. } else {
  254. _this.calculateSongForStation(station, (err, playlist) => {
  255. if (!err && playlist.length === 0) {
  256. let $set = {};
  257. $set.currentSongIndex = 0;
  258. $set.currentSong = _this.defaultSong;
  259. $set.startedAt = Date.now();
  260. $set.timePaused = 0;
  261. next(null, $set);
  262. } else {
  263. songs.getSong(playlist[0], (err, song) => {
  264. let $set = {};
  265. if (!err) {
  266. $set.currentSong = {
  267. _id: song._id,
  268. title: song.title,
  269. artists: song.artists,
  270. duration: song.duration,
  271. likes: song.likes,
  272. dislikes: song.dislikes,
  273. skipDuration: song.skipDuration,
  274. thumbnail: song.thumbnail
  275. };
  276. } else {
  277. $set.currentSong = _this.defaultSong;
  278. }
  279. $set.currentSongIndex = 0;
  280. $set.startedAt = Date.now();
  281. $set.timePaused = 0;
  282. next(null, $set);
  283. });
  284. }
  285. });
  286. }
  287. } else {
  288. if (station.partyMode === true) {
  289. if (station.queue.length > 0) {
  290. console.log("##");
  291. db.models.station.update({_id: stationId}, {$pull: {queue: {songId: station.queue[0]._id}}}, (err) => {
  292. console.log("##1", err);
  293. if (err) return next(err);
  294. let $set = {};
  295. $set.currentSong = station.queue[0];
  296. $set.startedAt = Date.now();
  297. $set.timePaused = 0;
  298. if (station.paused) {
  299. $set.pausedAt = Date.now();
  300. }
  301. next(null, $set);
  302. });
  303. } else {
  304. console.log("##2");
  305. next(null, {currentSong: null});
  306. }
  307. } else {
  308. db.models.playlist.findOne({_id: station.privatePlaylist}, (err, playlist) => {
  309. console.log(station.privatePlaylist, err, playlist);
  310. if (err || !playlist) return next(null, {currentSong: null});
  311. playlist = playlist.songs;
  312. if (playlist.length > 0) {
  313. let $set = {};
  314. if (station.currentSongIndex < playlist.length - 1) {
  315. $set.currentSongIndex = station.currentSongIndex + 1;
  316. } else {
  317. $set.currentSongIndex = 0;
  318. }
  319. songs.getSong(playlist[$set.currentSongIndex]._id, (err, song) => {
  320. if (!err && song) {
  321. $set.currentSong = {
  322. _id: song._id,
  323. title: song.title,
  324. artists: song.artists,
  325. duration: song.duration,
  326. likes: song.likes,
  327. dislikes: song.dislikes,
  328. skipDuration: song.skipDuration,
  329. thumbnail: song.thumbnail
  330. };
  331. } else {
  332. let song = playlist[$set.currentSongIndex];
  333. $set.currentSong = {
  334. _id: song._id,
  335. title: song.title,
  336. duration: song.duration,
  337. likes: -1,
  338. dislikes: -1
  339. };
  340. }
  341. $set.startedAt = Date.now();
  342. $set.timePaused = 0;
  343. next(null, $set);
  344. });
  345. } else {
  346. next(null, {currentSong: null});
  347. }
  348. });
  349. }
  350. }
  351. },
  352. ($set, next) => {
  353. console.log("$set", $set);
  354. db.models.station.update({_id: station._id}, {$set}, (err) => {
  355. console.log("##2.5", err);
  356. _this.updateStation(station._id, (err, station) => {
  357. console.log("##2.6", err);
  358. if (station.type === 'community' && station.partyMode === true) {
  359. cache.pub('station.queueUpdate', stationId);
  360. }
  361. next(null, station);
  362. });
  363. });
  364. },
  365. ], (err, station) => {
  366. console.log("##3", err);
  367. if (!err) {
  368. if (station.currentSong !== null && station.currentSong._id !== undefined) {
  369. station.currentSong.skipVotes = 0;
  370. }
  371. utils.emitToRoom(`station.${station._id}`, "event:songs.next", {
  372. currentSong: station.currentSong,
  373. startedAt: station.startedAt,
  374. paused: station.paused,
  375. timePaused: 0
  376. });
  377. if (station.currentSong !== null && station.currentSong._id !== undefined) {
  378. utils.socketsJoinSongRoom(utils.getRoomSockets(`station.${station._id}`), `song.${station.currentSong._id}`);
  379. console.log("NEXT SONG!!!", station.currentSong);
  380. if (!station.paused) {
  381. notifications.schedule(`stations.nextSong?id=${station._id}`, station.currentSong.duration * 1000);
  382. }
  383. } else {
  384. console.log("22", !!(station.currentSong));
  385. utils.socketsLeaveSongRooms(utils.getRoomSockets(`station.${station._id}`), `song.${station.currentSong._id}`);
  386. }
  387. cb(null, station);
  388. } else cb(err);
  389. });
  390. }
  391. // the station doesn't exist anymore, unsubscribe from it
  392. else {
  393. cb("Station not found.");
  394. }
  395. });
  396. }
  397. },
  398. defaultSong: {
  399. _id: '60ItHLz5WEA',
  400. title: 'Faded',
  401. artists: ['Alan Walker'],
  402. duration: 212,
  403. skipDuration: 0,
  404. thumbnail: 'https://i.scdn.co/image/2ddde58427f632037093857ebb71a67ddbdec34b'
  405. }
  406. };