stations.js 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. 'use strict';
  2. // This file contains all the logic for Socket.IO
  3. const cache = require('./cache');
  4. const db = require('./db');
  5. const io = require('./io');
  6. const utils = require('./utils');
  7. const notifications = require('./notifications');
  8. const async = require('async');
  9. let skipTimeout = null;
  10. module.exports = {
  11. init: function(cb) {
  12. let _this = this;
  13. console.log("Init stations");
  14. db.models.station.find({}, (err, stations) => {
  15. if (!err) {
  16. stations.forEach((station) => {
  17. console.log("Initing " + station._id);
  18. _this.initializeAndReturnStation(station._id, (err, station) => {
  19. //TODO Emit to homepage and admin station list
  20. });
  21. });
  22. cb();
  23. }
  24. });
  25. },
  26. calculateSongForStation: (station, cb) => {
  27. let songList = [];
  28. async.waterfall([
  29. (next) => {
  30. let genresDone = [];
  31. station.genres.forEach((genre) => {
  32. db.models.song.find({genres: genre}, (err, songs) => {
  33. if (!err) {
  34. songs.forEach((song) => {
  35. if (songList.indexOf(song._id) === -1) songList.push(song._id);
  36. });
  37. }
  38. genresDone.push(genre);
  39. if (genresDone.length === station.genres.length) {
  40. next();
  41. }
  42. });
  43. });
  44. },
  45. (next) => {
  46. let playlist = [];
  47. songList.forEach(function(songId) {
  48. if(station.playlist.indexOf(songId) === -1) playlist.push(songId);
  49. });
  50. station.playlist.filter((songId) => {
  51. if (songList.indexOf(songId) !== -1) playlist.push(songId);
  52. });
  53. db.models.station.update({_id: station._id}, {$set: {playlist: playlist}}, (err, result) => {
  54. next(err, playlist);
  55. });
  56. }
  57. ], (err, newPlaylist) => {
  58. cb(err, newPlaylist);
  59. });
  60. },
  61. initializeAndReturnStation: function(stationId, cb) {
  62. let _this = this;
  63. async.waterfall([
  64. // first check the cache for the station
  65. (next) => cache.hget('stations', stationId, next),
  66. // if the cached version exist
  67. (station, next) => {
  68. if (station) return next(true, station);
  69. db.models.station.findOne({ _id: stationId }, next);
  70. },
  71. // if the station exists in the DB, add it to the cache
  72. (station, next) => {
  73. if (!station) return cb('Station by that id does not exist');
  74. station = cache.schemas.station(station);
  75. cache.hset('stations', station._id, station, (err) => next(err, station));
  76. }
  77. ], (err, station) => {
  78. if (err && err !== true) return cb(err);
  79. // get notified when the next song for this station should play, so that we can notify our sockets
  80. /*let notification = notifications.subscribe(`stations.nextSong?id=${station._id}`, () => {*/
  81. function skipSongTemp() {
  82. // get the station from the cache
  83. console.log('NOTIFICATION');
  84. //TODO Recalculate songs if the last song of the station playlist is getting played
  85. cache.hget('stations', station._id, (err, station) => {
  86. if (station) {
  87. // notify all the sockets on this station to go to the next song
  88. async.waterfall([
  89. (next) => {
  90. if (station.currentSongIndex < station.playlist.length - 1) {
  91. station.currentSongIndex++;
  92. db.models.song.findOne({_id: station.playlist[station.currentSongIndex]}, (err, song) => {
  93. if (!err) {
  94. station.currentSong = {
  95. _id: song._id,
  96. title: song.title,
  97. artists: song.artists,
  98. duration: song.duration,
  99. likes: song.likes,
  100. dislikes: song.dislikes,
  101. skipDuration: song.skipDuration,
  102. thumbnail: song.thumbnail
  103. };
  104. station.startedAt = Date.now();
  105. next(null, station);
  106. }
  107. });
  108. } else {
  109. station.currentSongIndex = 0;
  110. _this.calculateSongForStation(station, (err, newPlaylist) => {
  111. if (!err) {
  112. db.models.song.findOne({_id: newPlaylist[0]}, (err, song) => {
  113. if (!err) {
  114. station.currentSong = {
  115. _id: song._id,
  116. title: song.title,
  117. artists: song.artists,
  118. duration: song.duration,
  119. likes: song.likes,
  120. dislikes: song.dislikes,
  121. skipDuration: song.skipDuration,
  122. thumbnail: song.thumbnail
  123. };
  124. station.startedAt = Date.now();
  125. station.playlist = newPlaylist;
  126. next(null, station);
  127. }
  128. });
  129. }
  130. })
  131. }
  132. },
  133. (station, next) => {
  134. cache.hset('stations', station._id, station, (err) => next(err, station));
  135. //TODO Also save to DB
  136. },
  137. ], (err, station) => {
  138. io.io.to(`station.${stationId}`).emit("event:songs.next", {
  139. currentSong: station.currentSong,
  140. startedAt: station.startedAt,
  141. paused: station.paused,
  142. timePaused: 0
  143. });
  144. // schedule a notification to be dispatched when the next song ends
  145. notifications.schedule(`stations.nextSong?id=${station.id}`, station.currentSong.duration * 1000);
  146. skipTimeout = setTimeout(skipSongTemp, station.currentSong.duration * 1000);
  147. });
  148. }
  149. // the station doesn't exist anymore, unsubscribe from it
  150. else {
  151. notifications.remove(notification);
  152. }
  153. });
  154. }//, true);
  155. if (!station.paused) {
  156. if (!station.startedAt) {
  157. station.startedAt = Date.now();
  158. cache.hset('stations', stationId, station);
  159. }
  160. //setTimeout(skipSongTemp, station.currentSong.duration * 1000);
  161. if (skipTimeout === null) {
  162. skipTimeout = setTimeout(skipSongTemp, 1000);
  163. }
  164. notifications.schedule(`stations.nextSong?id=${station.id}`, station.currentSong.duration * 1000);
  165. }
  166. return cb(null, station);
  167. });
  168. }
  169. };