Browse Source

Improved memory with backend job system

Kristian Vos 3 years ago
parent
commit
d6b189a5ba

+ 10 - 3
backend/core.js

@@ -244,6 +244,13 @@ class Job {
 		this.responseType = responseType;
 	}
 
+	/**
+	 * Removes child jobs to prevent memory leak
+	 */
+	cleanup() {
+		this.childJobs = this.childJobs.map(() => null);
+	}
+
 	/**
 	 * Logs to the module of the job
 	 *
@@ -306,7 +313,6 @@ export default class CoreClass {
 		this.concurrency = options && options.concurrency ? options.concurrency : 10;
 		this.jobQueue = new Queue((job, options) => this._runJob(job, options), this.concurrency);
 		this.jobQueue.pause();
-		this.runningJobs = [];
 		this.priorities = options && options.priorities ? options.priorities : {};
 		this.stage = 0;
 		this.jobStatistics = {};
@@ -554,7 +560,7 @@ export default class CoreClass {
 
 			const previousStatus = job.status;
 			job.setStatus("RUNNING");
-			this.runningJobs.push(job);
+			this.moduleManager.jobManager.addJob(job);
 
 			if (previousStatus === "QUEUED") {
 				if (!options.isQuiet) this.log("INFO", `Job ${job.name} (${job.toString()}) is queued, so calling it`);
@@ -606,7 +612,8 @@ export default class CoreClass {
 						const executionTime = endTime - startTime;
 						this.jobStatistics[job.name].total += 1;
 						this.jobStatistics[job.name].averageTiming.update(executionTime);
-						this.runningJobs.splice(this.runningJobs.indexOf(job), 1);
+						this.moduleManager.jobManager.removeJob(job);
+						job.cleanup();
 
 						if (!job.parentJob) {
 							if (job.responseType === "RESOLVE") {

+ 35 - 19
backend/index.js

@@ -224,11 +224,36 @@ if (config.debug && config.debug.traceUnhandledPromises === true) {
 // 	}
 // }
 
+class JobManager {
+	constructor() {
+		this.runningJobs = {};
+	}
+
+	addJob(job) {
+		if (!this.runningJobs[job.module.name]) this.runningJobs[job.module.name] = {};
+		this.runningJobs[job.module.name][job.toString()] = job;
+	}
+
+	removeJob(job) {
+		if (!this.runningJobs[job.module.name]) this.runningJobs[job.module.name] = {};
+		delete this.runningJobs[job.module.name][job.toString()];
+	}
+
+	getJob(uuid) {
+		let job = null;
+		Object.keys(this.runningJobs).forEach(moduleName => {
+			if (this.runningJobs[moduleName][uuid]) job = this.runningJobs[moduleName][uuid];
+		});
+		return job;
+	}
+}
+
 class ModuleManager {
 	// eslint-disable-next-line require-jsdoc
 	constructor() {
 		this.modules = {};
 		this.modulesNotInitialized = [];
+		this.jobManager = new JobManager();
 		this.i = 0;
 		this.lockdown = false;
 		this.fancyConsole = fancyConsole;
@@ -318,8 +343,7 @@ class ModuleManager {
 
 			this.log(
 				"INFO",
-				`Initialized: ${Object.keys(this.modules).length - this.modulesNotInitialized.length}/${
-					Object.keys(this.modules).length
+				`Initialized: ${Object.keys(this.modules).length - this.modulesNotInitialized.length}/${Object.keys(this.modules).length
 				}.`
 			);
 
@@ -404,10 +428,12 @@ moduleManager.initialize();
  */
 function printJob(job, layer) {
 	const tabs = Array(layer).join("\t");
-	console.log(`${tabs}${job.name} (${job.toString()}) ${job.status}`);
-	job.childJobs.forEach(childJob => {
-		printJob(childJob, layer + 1);
-	});
+	if (job) {
+		console.log(`${tabs}${job.name} (${job.toString()}) ${job.status}`);
+		job.childJobs.forEach(childJob => {
+			printJob(childJob, layer + 1);
+		});
+	} else console.log(`${tabs}JOB WAS REMOVED`);
 }
 
 /**
@@ -439,8 +465,7 @@ process.stdin.on("data", data => {
 			console.log(
 				`${moduleName.toUpperCase()}${Array(tabsNeeded).join(
 					"\t"
-				)}${module.getStatus()}. Jobs in queue: ${module.jobQueue.lengthQueue()}. Jobs in progress: ${module.jobQueue.lengthRunning()}. Jobs paused: ${module.jobQueue.lengthPaused()} Concurrency: ${
-					module.jobQueue.concurrency
+				)}${module.getStatus()}. Jobs in queue: ${module.jobQueue.lengthQueue()}. Jobs in progress: ${module.jobQueue.lengthRunning()}. Jobs paused: ${module.jobQueue.lengthPaused()} Concurrency: ${module.jobQueue.concurrency
 				}. Stage: ${module.getStage()}`
 			);
 		});
@@ -476,17 +501,8 @@ process.stdin.on("data", data => {
 		const parts = command.split(" ");
 
 		const uuid = parts[1];
-		let jobFound = null;
+		let jobFound = moduleManager.jobManager.getJob(uuid);
 
-		Object.keys(moduleManager.modules).forEach(moduleName => {
-			const module = moduleManager.modules[moduleName];
-			const task1 = module.jobQueue.runningTasks.find(task => task.job.uniqueId === uuid);
-			const task2 = module.jobQueue.queue.find(task => task.job.uniqueId === uuid);
-			const task3 = module.jobQueue.pausedTasks.find(task => task.job.uniqueId === uuid);
-			if (task1) jobFound = task1.job;
-			if (task2) jobFound = task2.job;
-			if (task3) jobFound = task3.job;
-		});
 
 		if (jobFound) {
 			let topParent = jobFound;
@@ -500,7 +516,7 @@ process.stdin.on("data", data => {
 			);
 			console.log(jobFound);
 			printJob(topParent, 1);
-		} else console.log("Could not find job in any running, queued or paused lists in any module.");
+		} else console.log("Could not find job in job manager.");
 	}
 	if (command.startsWith("runjob")) {
 		const parts = command.split(" ");

+ 7 - 7
backend/logic/actions/songs.js

@@ -332,7 +332,7 @@ export default {
 		async.waterfall(
 			[
 				next => {
-					SongsModule.runJob("UPDATE_ALL_SONGS", {})
+					SongsModule.runJob("UPDATE_ALL_SONGS", {}, this)
 						.then(() => {
 							next();
 						})
@@ -499,8 +499,8 @@ export default {
 								.filter((value, index, self) => self.indexOf(value) === index)
 								.forEach(genre => {
 									PlaylistsModule.runJob("AUTOFILL_GENRE_PLAYLIST", { genre })
-										.then(() => {})
-										.catch(() => {});
+										.then(() => { })
+										.catch(() => { });
 								});
 
 							next(null, song);
@@ -776,8 +776,8 @@ export default {
 				(song, next) => {
 					song.genres.forEach(genre => {
 						PlaylistsModule.runJob("AUTOFILL_GENRE_PLAYLIST", { genre })
-							.then(() => {})
-							.catch(() => {});
+							.then(() => { })
+							.catch(() => { });
 					});
 
 					SongsModule.runJob("UPDATE_SONG", { songId: song._id });
@@ -844,8 +844,8 @@ export default {
 				(song, next) => {
 					song.genres.forEach(genre => {
 						PlaylistsModule.runJob("AUTOFILL_GENRE_PLAYLIST", { genre })
-							.then(() => {})
-							.catch(() => {});
+							.then(() => { })
+							.catch(() => { });
 					});
 
 					SongsModule.runJob("UPDATE_SONG", { songId });

+ 13 - 5
backend/logic/playlists.js

@@ -554,14 +554,22 @@ class _PlaylistsModule extends CoreClass {
 					// },
 
 					(playlistId, next) => {
-						StationsModule.runJob("GET_STATIONS_THAT_INCLUDE_OR_EXCLUDE_PLAYLIST", { playlistId })
+						StationsModule.runJob("GET_STATIONS_THAT_INCLUDE_OR_EXCLUDE_PLAYLIST", { playlistId }, this)
 							.then(response => {
-								response.stationIds.forEach(stationId => {
-									PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }).then().catch();
+								async.eachLimit(response.stationIds, 1, (stationId, next) => {
+									PlaylistsModule.runJob("AUTOFILL_STATION_PLAYLIST", { stationId }, this).then(() => {
+										next();
+									}).catch(err => {
+										next(err);
+									});
+								}, err => {
+									if (err) next(err);
+									else next();
 								});
 							})
-							.catch();
-						next();
+							.catch(err => {
+								next(err);
+							});
 					}
 				],
 				err => {

+ 78 - 18
backend/logic/songs.js

@@ -203,7 +203,7 @@ class _SongsModule extends CoreClass {
 						} else {
 							const status =
 								(!payload.userId && config.get("hideAnonymousSongs")) ||
-								(payload.automaticallyRequested && config.get("hideAutomaticallyRequestedSongs"))
+									(payload.automaticallyRequested && config.get("hideAutomaticallyRequestedSongs"))
 									? "hidden"
 									: "unverified";
 
@@ -291,7 +291,6 @@ class _SongsModule extends CoreClass {
 					},
 
 					(song, next) => {
-						next(null, song);
 						const { _id, youtubeId, title, artists, thumbnail, duration, status } = song;
 						const trimmedSong = {
 							_id,
@@ -302,25 +301,75 @@ class _SongsModule extends CoreClass {
 							duration,
 							status
 						};
-						this.log("INFO", `Going to update playlists and stations now for song ${_id}`);
-						DBModule.runJob("GET_MODEL", { modelName: "playlist" }).then(playlistModel => {
+						this.log("INFO", `Going to update playlists now for song ${_id}`);
+						DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this).then(playlistModel => {
 							playlistModel.updateMany(
 								{ "songs._id": song._id },
 								{ $set: { "songs.$": trimmedSong } },
 								err => {
-									if (err) this.log("ERROR", err);
+									if (err) next(err);
 									else
 										playlistModel.find({ "songs._id": song._id }, (err, playlists) => {
-											playlists.forEach(playlist => {
-												PlaylistsModule.runJob("UPDATE_PLAYLIST", {
-													playlistId: playlist._id
+											if (err) next(err);
+											else {
+												async.eachLimit(playlists, 1, (playlist, next) => {
+													PlaylistsModule.runJob("UPDATE_PLAYLIST", {
+														playlistId: playlist._id
+													}, this).then(() => {
+														next();
+													}).catch(err => {
+														next(err);
+													});
+												}, err => {
+													if (err) next(err);
+													else next(null, song)
 												});
-											});
+											}
+											// playlists.forEach(playlist => {
+											// 	PlaylistsModule.runJob("UPDATE_PLAYLIST", {
+											// 		playlistId: playlist._id
+											// 	});
+											// });
 										});
 								}
 							);
+						}).catch(err => {
+							next(err);
 						});
-						DBModule.runJob("GET_MODEL", { modelName: "station" }).then(stationModel => {
+					},
+
+					(song, next) => {
+						// next(null, song);
+						const { _id, youtubeId, title, artists, thumbnail, duration, status } = song;
+						// const trimmedSong = {
+						// 	_id,
+						// 	youtubeId,
+						// 	title,
+						// 	artists,
+						// 	thumbnail,
+						// 	duration,
+						// 	status
+						// };
+						// this.log("INFO", `Going to update playlists and stations now for song ${_id}`);
+						// DBModule.runJob("GET_MODEL", { modelName: "playlist" }).then(playlistModel => {
+						// 	playlistModel.updateMany(
+						// 		{ "songs._id": song._id },
+						// 		{ $set: { "songs.$": trimmedSong } },
+						// 		err => {
+						// 			if (err) this.log("ERROR", err);
+						// 			else
+						// 				playlistModel.find({ "songs._id": song._id }, (err, playlists) => {
+						// 					playlists.forEach(playlist => {
+						// 						PlaylistsModule.runJob("UPDATE_PLAYLIST", {
+						// 							playlistId: playlist._id
+						// 						});
+						// 					});
+						// 				});
+						// 		}
+						// 	);
+						// });
+						this.log("INFO", `Going to update stations now for song ${_id}`);
+						DBModule.runJob("GET_MODEL", { modelName: "station" }, this).then(stationModel => {
 							stationModel.updateMany(
 								{ "queue._id": song._id },
 								{
@@ -337,12 +386,24 @@ class _SongsModule extends CoreClass {
 									if (err) this.log("ERROR", err);
 									else
 										stationModel.find({ "queue._id": song._id }, (err, stations) => {
-											stations.forEach(station => {
-												StationsModule.runJob("UPDATE_STATION", { stationId: station._id });
-											});
+											if (err) next(err);
+											else {
+												async.eachLimit(stations, 1, (station, next) => {
+													StationsModule.runJob("UPDATE_STATION", { stationId: station._id }, this).then(() => {
+														next();
+													}).catch(err => {
+														next(err);
+													});
+												}, err => {
+													if (err) next(err);
+													else next(null, song);
+												});
+											}
 										});
 								}
 							);
+						}).catch(err => {
+							next(err);
 						});
 					},
 
@@ -381,7 +442,6 @@ class _SongsModule extends CoreClass {
 			async.waterfall(
 				[
 					next => {
-						return next("Currently disabled since it's broken due to the backend memory leak issue.");
 						SongsModule.SongModel.find({}, next);
 					},
 
@@ -390,11 +450,11 @@ class _SongsModule extends CoreClass {
 						const { length } = songs;
 						async.eachLimit(
 							songs,
-							10,
+							2,
 							(song, next) => {
 								index += 1;
 								console.log(`Updating song #${index} out of ${length}: ${song._id}`);
-								SongsModule.runJob("UPDATE_SONG", { songId: song._id }, this, 9)
+								SongsModule.runJob("UPDATE_SONG", { songId: song._id }, this)
 									.then(() => {
 										next();
 									})
@@ -464,7 +524,7 @@ class _SongsModule extends CoreClass {
 	// 										}
 	// 									}
 	// 								);
-									
+
 	// 							}
 	// 						});
 	// 					});
@@ -480,7 +540,7 @@ class _SongsModule extends CoreClass {
 	// 									else {
 	// 										stations.forEach(station => {
 	// 											StationsModule.runJob("UPDATE_STATION", { stationId: station._id });
-	// 										});	
+	// 										});
 	// 									}
 	// 								}
 	// 							);

+ 3 - 2
frontend/src/pages/Admin/tabs/VerifiedSongs.vue

@@ -27,7 +27,7 @@
 			>
 				Keyboard shortcuts helper
 			</button>
-			<!-- <confirm placement="bottom" @confirm="updateAllSongs()">
+			<confirm placement="bottom" @confirm="updateAllSongs()">
 				<button
 					class="button is-danger"
 					content="Update all songs"
@@ -35,7 +35,7 @@
 				>
 					Update all songs
 				</button>
-			</confirm> -->
+			</confirm>
 			<br />
 			<div>
 				<input
@@ -363,6 +363,7 @@ export default {
 			});
 		},
 		updateAllSongs() {
+			new Toast("Updating all songs, this could take a very long time.");
 			this.socket.dispatch("songs.updateAll", res => {
 				if (res.status === "success") new Toast(res.message);
 				else new Toast(res.message);