stations.js 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. 'use strict';
  2. // This file contains all the logic for Socket.IO
  3. const cache = require('./cache');
  4. const db = require('./db');
  5. const io = require('./io');
  6. const utils = require('./utils');
  7. const notifications = require('./notifications');
  8. const async = require('async');
  9. let skipTimeout = null;
  10. module.exports = {
  11. init: function(cb) {
  12. let _this = this;
  13. db.models.station.find({}, (err, stations) => {
  14. if (!err) {
  15. stations.forEach((station) => {
  16. console.info("Initializing Station: " + station._id);
  17. _this.initializeAndReturnStation(station._id, (err, station) => {
  18. //TODO Emit to homepage and admin station list
  19. });
  20. });
  21. cb();
  22. }
  23. });
  24. },
  25. calculateSongForStation: (station, cb) => {
  26. let songList = [];
  27. async.waterfall([
  28. (next) => {
  29. let genresDone = [];
  30. station.genres.forEach((genre) => {
  31. db.models.song.find({genres: genre}, (err, songs) => {
  32. if (!err) {
  33. songs.forEach((song) => {
  34. if (songList.indexOf(song._id) === -1) songList.push(song._id);
  35. });
  36. }
  37. genresDone.push(genre);
  38. if (genresDone.length === station.genres.length) {
  39. next();
  40. }
  41. });
  42. });
  43. },
  44. (next) => {
  45. let playlist = [];
  46. songList.forEach(function(songId) {
  47. if(station.playlist.indexOf(songId) === -1) playlist.push(songId);
  48. });
  49. station.playlist.filter((songId) => {
  50. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  51. });
  52. db.models.station.update({_id: station._id}, {$set: {playlist: playlist}}, (err, result) => {
  53. next(err, playlist);
  54. });
  55. }
  56. ], (err, newPlaylist) => {
  57. cb(err, newPlaylist);
  58. });
  59. },
  60. initializeAndReturnStation: function(stationId, cb) {
  61. let _this = this;
  62. async.waterfall([
  63. // first check the cache for the station
  64. (next) => cache.hget('stations', stationId, next),
  65. // if the cached version exist
  66. (station, next) => {
  67. if (station) return next(true, station);
  68. db.models.station.findOne({ _id: stationId }, next);
  69. },
  70. // if the station exists in the DB, add it to the cache
  71. (station, next) => {
  72. if (!station) return cb('Station by that id does not exist');
  73. station = cache.schemas.station(station);
  74. cache.hset('stations', station._id, station, (err) => next(err, station));
  75. }
  76. ], (err, station) => {
  77. if (err && err !== true) return cb(err);
  78. // get notified when the next song for this station should play, so that we can notify our sockets
  79. /*let notification = notifications.subscribe(`stations.nextSong?id=${station._id}`, () => {*/
  80. function skipSongTemp() {
  81. // get the station from the cache
  82. //TODO Recalculate songs if the last song of the station playlist is getting played
  83. cache.hget('stations', station._id, (err, station) => {
  84. if (station) {
  85. // notify all the sockets on this station to go to the next song
  86. async.waterfall([
  87. (next) => {
  88. if (station.currentSongIndex < station.playlist.length - 1) {
  89. station.currentSongIndex++;
  90. db.models.song.findOne({ _id: station.playlist[station.currentSongIndex] }, (err, song) => {
  91. if (!err) {
  92. station.currentSong = {
  93. _id: song._id,
  94. title: song.title,
  95. artists: song.artists,
  96. duration: song.duration,
  97. likes: song.likes,
  98. dislikes: song.dislikes,
  99. skipDuration: song.skipDuration,
  100. thumbnail: song.thumbnail
  101. };
  102. station.startedAt = Date.now();
  103. next(null, station);
  104. }
  105. });
  106. } else {
  107. station.currentSongIndex = 0;
  108. _this.calculateSongForStation(station, (err, newPlaylist) => {
  109. console.log('New playlist: ', newPlaylist)
  110. if (!err) {
  111. db.models.song.findOne({ _id: newPlaylist[0] }, (err, song) => {
  112. if (song) {
  113. station.currentSong = {
  114. _id: song._id,
  115. title: song.title,
  116. artists: song.artists,
  117. duration: song.duration,
  118. likes: song.likes,
  119. dislikes: song.dislikes,
  120. skipDuration: song.skipDuration,
  121. thumbnail: song.thumbnail
  122. };
  123. station.startedAt = Date.now();
  124. station.playlist = newPlaylist;
  125. next(null, station);
  126. }
  127. });
  128. }
  129. })
  130. }
  131. },
  132. (station, next) => {
  133. cache.hset('stations', station._id, station, (err) => next(err, station));
  134. //TODO Also save to DB
  135. },
  136. ], (err, station) => {
  137. io.io.to(`station.${stationId}`).emit("event:songs.next", {
  138. currentSong: station.currentSong,
  139. startedAt: station.startedAt,
  140. paused: station.paused,
  141. timePaused: 0
  142. });
  143. // schedule a notification to be dispatched when the next song ends
  144. notifications.schedule(`stations.nextSong?id=${station.id}`, station.currentSong.duration * 1000);
  145. skipTimeout = setTimeout(skipSongTemp, station.currentSong.duration * 1000);
  146. });
  147. }
  148. // the station doesn't exist anymore, unsubscribe from it
  149. else {
  150. notifications.remove(notification);
  151. }
  152. });
  153. }//, true);
  154. if (!station.paused) {
  155. if (!station.startedAt) {
  156. station.startedAt = Date.now();
  157. cache.hset('stations', stationId, station);
  158. }
  159. //setTimeout(skipSongTemp, station.currentSong.duration * 1000);
  160. if (skipTimeout === null) {
  161. skipTimeout = setTimeout(skipSongTemp, 1000);
  162. }
  163. notifications.schedule(`stations.nextSong?id=${station.id}`, station.currentSong.duration * 1000);
  164. }
  165. return cb(null, station);
  166. });
  167. }
  168. };