Browse Source

Merge branch 'polishing' of github.com:Musare/MusareNode into polishing

Jonathan 4 years ago
parent
commit
30f57f6864

+ 17 - 9
backend/core.js

@@ -292,15 +292,22 @@ class MovingAverageCalculator {
 }
 
 export default class CoreClass {
-	// eslint-disable-next-line require-jsdoc
-	constructor(name) {
+	/**
+	 *
+	 * @param {string} name - the name of the class
+	 * @param {object} options - optional options
+	 * @param {number} options.concurrency - how many jobs can run at the same time
+	 * @param {object} options.priorities - custom priorities for jobs
+	 */
+	constructor(name, options) {
 		this.name = name;
 		this.status = "UNINITIALIZED";
 		// this.log("Core constructor");
-		this.jobQueue = new Queue((job, options) => this._runJob(job, options), 10);
+		this.concurrency = options && options.concurrency ? options.concurrency : 10;
+		this.jobQueue = new Queue((job, options) => this._runJob(job, options), this.concurrency);
 		this.jobQueue.pause();
 		this.runningJobs = [];
-		this.priorities = {};
+		this.priorities = options && options.priorities ? options.priorities : {};
 		this.stage = 0;
 		this.jobStatistics = {};
 
@@ -499,11 +506,12 @@ export default class CoreClass {
 
 		// if (options.bypassQueue) this._runJob(job, options, () => {});
 		// else {
-		const calculatedPriority = Math.min(
-			_priority || Infinity,
-			_parentJob ? _parentJob.task.priority : Infinity,
-			this.priorities[name] ? this.priorities[name] : 10
-		);
+		let calculatedPriority = null;
+		if (_priority) calculatedPriority = _priority;
+		else if (this.priorities[name]) calculatedPriority = this.priorities[name];
+		else if (_parentJob) calculatedPriority = _parentJob.task.priority;
+		else calculatedPriority = 10;
+
 		this.jobQueue.push(job, _options, calculatedPriority);
 
 		if (

+ 1 - 1
backend/logic/actions/playlists.js

@@ -697,7 +697,7 @@ export default {
 				);
 				return cb({
 					status: "success",
-					message: `Playlist has been imported. ${addSongsStats.successful} were addedd successfully, ${addSongsStats.failed} failed (${addSongsStats.alreadyInPlaylist} were already in the playlist)`,
+					message: `Playlist has been imported. ${addSongsStats.successful} were added successfully, ${addSongsStats.failed} failed (${addSongsStats.alreadyInPlaylist} were already in the playlist)`,
 					data: playlist.songs,
 					stats: {
 						videosInPlaylistTotal,

+ 156 - 0
backend/logic/actions/stations.js

@@ -108,6 +108,16 @@ CacheModule.runJob("SUB", {
 				room: `station.${stationId}`,
 				args: ["event:stations.pause", { pausedAt: station.pausedAt }]
 			});
+
+			StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
+				room: `home`,
+				station
+			}).then(response => {
+				const { socketsThatCan } = response;
+				socketsThatCan.forEach(socket => {
+					socket.emit("event:station.pause", { stationId });
+				});
+			});
 		});
 	}
 });
@@ -120,6 +130,118 @@ CacheModule.runJob("SUB", {
 				room: `station.${stationId}`,
 				args: ["event:stations.resume", { timePaused: station.timePaused }]
 			});
+
+			StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
+				room: `home`,
+				station
+			})
+				.then(response => {
+					const { socketsThatCan } = response;
+					socketsThatCan.forEach(socket => {
+						socket.emit("event:station.resume", { stationId });
+					});
+				})
+				.catch(console.log);
+		});
+	}
+});
+
+CacheModule.runJob("SUB", {
+	channel: "station.privacyUpdate",
+	cb: response => {
+		const { stationId, previousPrivacy } = response;
+		StationsModule.runJob("GET_STATION", { stationId }).then(station => {
+			if (previousPrivacy !== station.privacy) {
+				if (station.privacy === "public") {
+					// Station became public
+
+					IOModule.runJob("EMIT_TO_ROOM", {
+						room: "home",
+						args: ["event:stations.created", station]
+					});
+				} else if (previousPrivacy === "public") {
+					// Station became hidden
+
+					StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
+						room: `home`,
+						station
+					}).then(response => {
+						const { socketsThatCan, socketsThatCannot } = response;
+						socketsThatCan.forEach(socket => {
+							socket.emit("event:station.updatePrivacy", { stationId, privacy: station.privacy });
+						});
+						socketsThatCannot.forEach(socket => {
+							socket.emit("event:station.removed", { stationId });
+						});
+					});
+				} else {
+					// Station was hidden and is still hidden
+
+					StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
+						room: `home`,
+						station
+					}).then(response => {
+						const { socketsThatCan } = response;
+						socketsThatCan.forEach(socket => {
+							socket.emit("event:station.updatePrivacy", { stationId, privacy: station.privacy });
+						});
+					});
+				}
+			}
+		});
+	}
+});
+
+CacheModule.runJob("SUB", {
+	channel: "station.nameUpdate",
+	cb: response => {
+		const { stationId } = response;
+		StationsModule.runJob("GET_STATION", { stationId }).then(station => {
+			StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
+				room: `home`,
+				station
+			}).then(response => {
+				const { socketsThatCan } = response;
+				socketsThatCan.forEach(socket => {
+					socket.emit("event:station.updateName", { stationId, name: station.name });
+				});
+			});
+		});
+	}
+});
+
+CacheModule.runJob("SUB", {
+	channel: "station.displayNameUpdate",
+	cb: response => {
+		const { stationId } = response;
+		StationsModule.runJob("GET_STATION", { stationId }).then(station => {
+			StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
+				room: `home`,
+				station
+			}).then(response => {
+				const { socketsThatCan } = response;
+				socketsThatCan.forEach(socket => {
+					socket.emit("event:station.updateDisplayName", { stationId, displayName: station.displayName });
+				});
+			});
+		});
+	}
+});
+
+CacheModule.runJob("SUB", {
+	channel: "station.descriptionUpdate",
+	cb: response => {
+		const { stationId } = response;
+		StationsModule.runJob("GET_STATION", { stationId }).then(station => {
+			StationsModule.runJob("GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION", {
+				room: `home`,
+				station
+			}).then(response => {
+				const { socketsThatCan } = response;
+				socketsThatCan.forEach(socket => {
+					socket.emit("event:station.updateDescription", { stationId, description: station.description });
+				});
+			});
 		});
 	}
 });
@@ -153,6 +275,11 @@ CacheModule.runJob("SUB", {
 			room: `station.${stationId}`,
 			args: ["event:stations.remove"]
 		});
+		console.log(111, "REMOVED");
+		IOModule.runJob("EMIT_TO_ROOM", {
+			room: `home`,
+			args: ["event:station.removed", { stationId }]
+		});
 		IOModule.runJob("EMIT_TO_ROOM", {
 			room: "admin.stations",
 			args: ["event:admin.station.removed", stationId]
@@ -867,6 +994,10 @@ export default {
 					"STATIONS_UPDATE_NAME",
 					`Updated station "${stationId}" name to "${newName}" successfully.`
 				);
+				CacheModule.runJob("PUB", {
+					channel: "station.nameUpdate",
+					value: { stationId }
+				});
 				return cb({
 					status: "success",
 					message: "Successfully updated the name."
@@ -926,6 +1057,10 @@ export default {
 					"STATIONS_UPDATE_DISPLAY_NAME",
 					`Updated station "${stationId}" displayName to "${newDisplayName}" successfully.`
 				);
+				CacheModule.runJob("PUB", {
+					channel: "station.displayNameUpdate",
+					value: { stationId }
+				});
 				return cb({
 					status: "success",
 					message: "Successfully updated the display name."
@@ -985,6 +1120,10 @@ export default {
 					"STATIONS_UPDATE_DESCRIPTION",
 					`Updated station "${stationId}" description to "${newDescription}" successfully.`
 				);
+				CacheModule.runJob("PUB", {
+					channel: "station.descriptionUpdate",
+					value: { stationId }
+				});
 				return cb({
 					status: "success",
 					message: "Successfully updated the description."
@@ -1009,8 +1148,21 @@ export default {
 			},
 			this
 		);
+		let previousPrivacy = null;
 		async.waterfall(
 			[
+				next => {
+					stationModel.findOne({ _id: stationId }, next);
+				},
+
+				(station, next) => {
+					if (!station) next("No station found.");
+					else {
+						previousPrivacy = station.privacy;
+						next();
+					}
+				},
+
 				next => {
 					stationModel.updateOne(
 						{ _id: stationId },
@@ -1043,6 +1195,10 @@ export default {
 					"STATIONS_UPDATE_PRIVACY",
 					`Updated station "${stationId}" privacy to "${newPrivacy}" successfully.`
 				);
+				CacheModule.runJob("PUB", {
+					channel: "station.privacyUpdate",
+					value: { stationId, previousPrivacy }
+				});
 				return cb({
 					status: "success",
 					message: "Successfully updated the privacy."

+ 101 - 0
backend/logic/stations.js

@@ -1085,6 +1085,107 @@ class _StationsModule extends CoreClass {
 			);
 		});
 	}
+
+	/**
+	 * Returns a list of sockets in a room that can and can't know about a station
+	 *
+	 * @param {object} payload - the payload object
+	 * @param {object} payload.station - the station object
+	 * @param {string} payload.room - the socket.io room to get the sockets from
+	 * @returns {Promise} - returns a promise (resolve, reject)
+	 */
+	GET_SOCKETS_THAT_CAN_KNOW_ABOUT_STATION(payload) {
+		return new Promise((resolve, reject) => {
+			IOModule.runJob("GET_ROOM_SOCKETS", { room: payload.room }, this)
+				.then(socketsObject => {
+					const sockets = Object.keys(socketsObject).map(socketKey => socketsObject[socketKey]);
+					let socketsThatCan = [];
+					const socketsThatCannot = [];
+
+					if (payload.station.privacy === "public") {
+						socketsThatCan = sockets;
+						resolve({ socketsThatCan, socketsThatCannot });
+					} else {
+						async.eachLimit(
+							sockets,
+							1,
+							(socket, next) => {
+								const { session } = socket;
+
+								async.waterfall(
+									[
+										next => {
+											if (!session.sessionId) next("No session id");
+											else next();
+										},
+
+										next => {
+											CacheModule.runJob(
+												"HGET",
+												{
+													table: "sessions",
+													key: session.sessionId
+												},
+												this
+											)
+												.then(response => {
+													next(null, response);
+												})
+												.catch(next);
+										},
+
+										(session, next) => {
+											if (!session) next("No session");
+											else {
+												DBModule.runJob("GET_MODEL", { modelName: "user" }, this)
+													.then(userModel => {
+														next(null, userModel);
+													})
+													.catch(next);
+											}
+										},
+
+										(userModel, next) => {
+											if (!userModel) next("No user model");
+											else
+												userModel.findOne(
+													{
+														_id: session.userId
+													},
+													next
+												);
+										},
+
+										(user, next) => {
+											if (!user) next("No user found");
+											else if (user.role === "admin") {
+												socketsThatCan.push(socket);
+												next();
+											} else if (
+												payload.station.type === "community" &&
+												payload.station.owner === session.userId
+											) {
+												socketsThatCan.push(socket);
+												next();
+											}
+										}
+									],
+									err => {
+										if (err) socketsThatCannot.push(socket);
+										next();
+									}
+								);
+							},
+							err => {
+								if (err) reject(err);
+								else resolve({ socketsThatCan, socketsThatCannot });
+							}
+						);
+					}
+				})
+				.catch(reject);
+		});
+	}
 }
 
 export default new _StationsModule();

+ 21 - 13
backend/logic/youtube.js

@@ -10,7 +10,12 @@ let YouTubeModule;
 class _YouTubeModule extends CoreClass {
 	// eslint-disable-next-line require-jsdoc
 	constructor() {
-		super("youtube");
+		super("youtube", {
+			concurrency: 1,
+			priorities: {
+				GET_PLAYLIST: 11
+			}
+		});
 
 		YouTubeModule = this;
 	}
@@ -135,17 +140,20 @@ class _YouTubeModule extends CoreClass {
 								next(null, nextPageToken !== undefined);
 							},
 							next => {
-								YouTubeModule.runJob("GET_PLAYLIST_PAGE", { playlistId, nextPageToken }, this)
-									// eslint-disable-next-line no-loop-func
-									.then(response => {
-										songs = songs.concat(response.songs);
-										nextPageToken = response.nextPageToken;
-										next();
-									})
-									// eslint-disable-next-line no-loop-func
-									.catch(err => {
-										next(err);
-									});
+								// Add 250ms delay between each job request
+								setTimeout(() => {
+									YouTubeModule.runJob("GET_PLAYLIST_PAGE", { playlistId, nextPageToken }, this)
+										// eslint-disable-next-line no-loop-func
+										.then(response => {
+											songs = songs.concat(response.songs);
+											nextPageToken = response.nextPageToken;
+											next();
+										})
+										// eslint-disable-next-line no-loop-func
+										.catch(err => {
+											next(err);
+										});
+								}, 250);
 							},
 							err => {
 								next(err, songs);
@@ -277,7 +285,7 @@ class _YouTubeModule extends CoreClass {
 					}
 				});
 
-				return YouTubeModule.runJob("FILTER_MUSIC_VIDEOS", { videoIds: payload.videoIds, page: page + 1 })
+				return YouTubeModule.runJob("FILTER_MUSIC_VIDEOS", { videoIds: payload.videoIds, page: page + 1 }, this)
 					.then(result => {
 						resolve({ songIds: songIds.concat(result.songIds) });
 					})

+ 88 - 7
frontend/src/pages/Home/index.vue

@@ -225,14 +225,35 @@ export default {
 			});
 			this.socket.on("event:stations.created", res => {
 				const station = res;
+				if (
+					this.stations.find(_station => _station._id === station._id)
+				) {
+					this.stations.forEach(s => {
+						const _station = s;
+						if (_station._id === station._id) {
+							_station.privacy = station.privacy;
+						}
+					});
+				} else {
+					if (!station.currentSong)
+						station.currentSong = {
+							thumbnail: "/assets/notes-transparent.png"
+						};
+					if (station.currentSong && !station.currentSong.thumbnail)
+						station.currentSong.ytThumbnail = `https://img.youtube.com/vi/${station.currentSong.songId}/mqdefault.jpg`;
+					this.stations.push(station);
+				}
+			});
 
-				if (!station.currentSong)
-					station.currentSong = {
-						thumbnail: "/assets/notes-transparent.png"
-					};
-				if (station.currentSong && !station.currentSong.thumbnail)
-					station.currentSong.ytThumbnail = `https://img.youtube.com/vi/${station.currentSong.songId}/mqdefault.jpg`;
-				this.stations.push(station);
+			this.socket.on("event:station.removed", response => {
+				const { stationId } = response;
+				const station = this.stations.find(
+					station => station._id === stationId
+				);
+				if (station) {
+					const stationIndex = this.stations.indexOf(station);
+					this.stations.splice(stationIndex, 1);
+				}
 			});
 
 			this.socket.on(
@@ -247,6 +268,46 @@ export default {
 				}
 			);
 
+			this.socket.on("event:station.updatePrivacy", response => {
+				const { stationId, privacy } = response;
+				this.stations.forEach(s => {
+					const station = s;
+					if (station._id === stationId) {
+						station.privacy = privacy;
+					}
+				});
+			});
+
+			this.socket.on("event:station.updateName", response => {
+				const { stationId, name } = response;
+				this.stations.forEach(s => {
+					const station = s;
+					if (station._id === stationId) {
+						station.name = name;
+					}
+				});
+			});
+
+			this.socket.on("event:station.updateDisplayName", response => {
+				const { stationId, displayName } = response;
+				this.stations.forEach(s => {
+					const station = s;
+					if (station._id === stationId) {
+						station.displayName = displayName;
+					}
+				});
+			});
+
+			this.socket.on("event:station.updateDescription", response => {
+				const { stationId, description } = response;
+				this.stations.forEach(s => {
+					const station = s;
+					if (station._id === stationId) {
+						station.description = description;
+					}
+				});
+			});
+
 			this.socket.on("event:station.nextSong", (stationId, song) => {
 				let newSong = song;
 				this.stations.forEach(s => {
@@ -263,6 +324,26 @@ export default {
 				});
 			});
 
+			this.socket.on("event:station.pause", response => {
+				const { stationId } = response;
+				this.stations.forEach(s => {
+					const station = s;
+					if (station._id === stationId) {
+						station.paused = true;
+					}
+				});
+			});
+
+			this.socket.on("event:station.resume", response => {
+				const { stationId } = response;
+				this.stations.forEach(s => {
+					const station = s;
+					if (station._id === stationId) {
+						station.paused = false;
+					}
+				});
+			});
+
 			this.socket.on("event:user.favoritedStation", stationId => {
 				this.favoriteStations.push(stationId);
 			});