Преглед на файлове

Further improved job system

Kristian Vos преди 4 години
родител
ревизия
17b027fde6
променени са 3 файла, в които са добавени 139 реда и са изтрити 67 реда
  1. 123 57
      backend/core.js
  2. 14 8
      backend/index.js
  3. 2 2
      backend/logic/playlists.js

+ 123 - 57
backend/core.js

@@ -1,4 +1,3 @@
-import async from "async";
 import config from "config";
 
 class DeferredPromise {
@@ -25,7 +24,8 @@ class Queue {
 		this.handleTaskFunction = handleTaskFunction;
 		this.concurrency = concurrency;
 		this.queue = [];
-		this.running = [];
+		this.runningTasks = [];
+		this.pausedTasks = [];
 		this.paused = false;
 	}
 
@@ -59,7 +59,7 @@ class Queue {
 	 * @returns {number} - amount of running jobs
 	 */
 	lengthRunning() {
-		return this.running.length;
+		return this.runningTasks.length;
 	}
 
 	/**
@@ -81,17 +81,32 @@ class Queue {
 	 * @param {object} job - the job to be removed
 	 */
 	removeRunningJob(job) {
-		this.running.remove(this.running.find(task => task.job.toString() === job.toString()));
+		this.runningTasks.remove(this.runningTasks.find(task => task.job.toString() === job.toString()));
+	}
+
+	pauseRunningJob(job) {
+		const task = this.runningTasks.find(task => task.job.toString() === job.toString());
+		this.runningTasks.remove(task);
+		this.pausedTasks.push(task);
+	}
+
+	resumeRunningJob(job) {
+		const task = this.pausedTasks.find(task => task.job.toString() === job.toString());
+		this.pausedTasks.remove(task);
+		this.queue.unshift(task);
+		setTimeout(() => {
+			this._handleQueue();
+		}, 0);
 	}
 
 	/**
 	 * Check if there's room for a job to be processed, and if there is, run it.
 	 */
 	_handleQueue() {
-		if (!this.paused && this.running.length < this.concurrency && this.queue.length > 0) {
+		if (!this.paused && this.runningTasks.length < this.concurrency && this.queue.length > 0) {
 			const task = this.queue.reduce((a, b) => (a.priority < b.priority ? b : a));
 			this.queue.remove(task);
-			this.running.push(task);
+			this.runningTasks.push(task);
 			this._handleTask(task);
 			setTimeout(() => {
 				this._handleQueue();
@@ -101,7 +116,7 @@ class Queue {
 
 	_handleTask(task) {
 		this.handleTaskFunction(task.job).finally(() => {
-			this.running.remove(task);
+			this.runningTasks.remove(task);
 			this._handleQueue();
 		});
 	}
@@ -112,6 +127,8 @@ class Job {
 	constructor(name, payload, onFinish, module, parentJob) {
 		this.name = name;
 		this.payload = payload;
+		this.response = null;
+		this.responseType = null;
 		this.onFinish = onFinish;
 		this.module = module;
 		this.parentJob = parentJob;
@@ -130,12 +147,21 @@ class Job {
 	}
 
 	setStatus(status) {
+		// console.log(`Job ${this.toString()} has changed status from ${this.status} to ${status}`);
 		this.status = status;
 	}
 
 	toString() {
 		return this.uniqueId;
 	}
+
+	setResponse(response) {
+		this.response = response;
+	}
+
+	setResponseType(responseType) {
+		this.responseType = responseType;
+	}
 }
 
 class MovingAverageCalculator {
@@ -315,14 +341,11 @@ export default class CoreClass {
 		if (parentJob) {
 			this.log(
 				"INFO",
-				`Removing job ${
-					parentJob.name
-				} (${parentJob.toString()}) from running jobs since a child job has to run first`
+				`Pausing job ${parentJob.name} (${parentJob.toString()}) since a child job has to run first`
 			);
 			parentJob.addChildJob(job);
 			parentJob.setStatus("WAITING_ON_CHILD_JOB");
-			parentJob.module.runningJobs.remove(job);
-			parentJob.module.jobQueue.removeRunningJob(job);
+			parentJob.module.jobQueue.pauseRunningJob(parentJob);
 			// console.log(111, parentJob.module.jobQueue.length());
 			// console.log(
 			// 	222,
@@ -379,56 +402,99 @@ export default class CoreClass {
 		return new Promise((resolve, reject) => {
 			const startTime = Date.now();
 
+			const previousStatus = job.status;
 			job.setStatus("RUNNING");
 			this.runningJobs.push(job);
 
-			this[job.name]
-				.apply(job, [job.payload])
-				.then(response => {
-					// if (!options.isQuiet)
-					this.log("INFO", `Ran job ${job.name} (${job.toString()}) successfully`);
-					job.setStatus("FINISHED");
-					this.jobStatistics[job.name].successful += 1;
-					if (
-						config.debug &&
-						config.debug.stationIssue === true &&
-						config.debug.captureJobs &&
-						config.debug.captureJobs.indexOf(job.name) !== -1
-					) {
-						this.moduleManager.debugJobs.completed.push({
-							status: "success",
-							job,
-							response
-						});
-					}
-					job.onFinish.resolve(response);
-				})
-				.catch(error => {
-					this.log("INFO", `Running job ${job.name} (${job.toString()}) failed`);
-					job.setStatus("FINISHED");
-					this.jobStatistics[job.name].failed += 1;
-					if (
-						config.debug &&
-						config.debug.stationIssue === true &&
-						config.debug.captureJobs &&
-						config.debug.captureJobs.indexOf(job.name) !== -1
-					) {
-						this.moduleManager.debugJobs.completed.push({
-							status: "error",
-							job,
-							error
-						});
+			if (previousStatus === "QUEUED") {
+				this.log("INFO", `Job ${job.name} (${job.toString()}) is queued, so calling it`);
+				this[job.name]
+					.apply(job, [job.payload])
+					.then(response => {
+						// if (!options.isQuiet)
+						this.log("INFO", `Ran job ${job.name} (${job.toString()}) successfully`);
+						job.setStatus("FINISHED");
+						job.setResponse(response);
+						this.jobStatistics[job.name].successful += 1;
+						job.setResponseType("RESOLVE");
+						if (
+							config.debug &&
+							config.debug.stationIssue === true &&
+							config.debug.captureJobs &&
+							config.debug.captureJobs.indexOf(job.name) !== -1
+						) {
+							this.moduleManager.debugJobs.completed.push({
+								status: "success",
+								job,
+								response
+							});
+						}
+						// job.onFinish.resolve(response);
+					})
+					.catch(error => {
+						this.log("INFO", `Running job ${job.name} (${job.toString()}) failed`);
+						job.setStatus("FINISHED");
+						job.setResponse(error);
+						job.setResponseType("REJECT");
+						this.jobStatistics[job.name].failed += 1;
+						if (
+							config.debug &&
+							config.debug.stationIssue === true &&
+							config.debug.captureJobs &&
+							config.debug.captureJobs.indexOf(job.name) !== -1
+						) {
+							this.moduleManager.debugJobs.completed.push({
+								status: "error",
+								job,
+								error
+							});
+						}
+						// job.onFinish.reject(error);
+					})
+					.finally(() => {
+						const endTime = Date.now();
+						const executionTime = endTime - startTime;
+						this.jobStatistics[job.name].total += 1;
+						this.jobStatistics[job.name].averageTiming.update(executionTime);
+						this.runningJobs.splice(this.runningJobs.indexOf(job), 1);
+						if (!job.parentJob) {
+							if (job.responseType === "RESOLVE") {
+								job.onFinish.resolve(job.response);
+								job.responseType = "RESOLVED";
+							} else if (job.responseType === "REJECT") {
+								job.onFinish.reject(job.response);
+								job.responseType = "REJECTED";
+							}
+						} else if (
+							job.parentJob &&
+							job.parentJob.childJobs.find(childJob => childJob.status !== "FINISHED") === undefined
+						) {
+							this.log(
+								"INFO",
+								`Requeing/resuming job ${
+									job.parentJob.name
+								} (${job.parentJob.toString()}) since all child jobs are complete.`
+							);
+							job.parentJob.setStatus("REQUEUED");
+							job.parentJob.module.jobQueue.resumeRunningJob(job.parentJob);
+						}
+						resolve();
+					});
+			} else {
+				this.log(
+					"INFO",
+					`Job ${job.name} (${job.toString()}) is re-queued, so resolving/rejecting all child jobs.`
+				);
+				job.childJobs.forEach(childJob => {
+					if (childJob.responseType === "RESOLVE") {
+						childJob.onFinish.resolve(childJob.payload);
+						childJob.responseType = "RESOLVED";
+					} else if (childJob.responseType === "REJECT") {
+						childJob.onFinish.reject(childJob.payload);
+						childJob.responseType = "REJECTED";
 					}
-					job.onFinish.reject(error);
-				})
-				.finally(() => {
-					const endTime = Date.now();
-					const executionTime = endTime - startTime;
-					this.jobStatistics[job.name].total += 1;
-					this.jobStatistics[job.name].averageTiming.update(executionTime);
-					this.runningJobs.splice(this.runningJobs.indexOf(job), 1);
-					resolve();
 				});
+			}
 		});
 	}
 }

+ 14 - 8
backend/index.js

@@ -420,27 +420,33 @@ process.stdin.on("data", data => {
 	if (command === "status") {
 		console.log("Status:");
 
-		for (
-			let moduleName = 0, moduleKeys = Object.keys(moduleManager.modules);
-			moduleName < moduleKeys.length;
-			moduleName += 1
-		) {
+		Object.keys(moduleManager.modules).forEach(moduleName => {
 			const module = moduleManager.modules[moduleName];
 			const tabsNeeded = 4 - Math.ceil((moduleName.length + 1) / 8);
 			console.log(
 				`${moduleName.toUpperCase()}${Array(tabsNeeded).join(
 					"\t"
-				)}${module.getStatus()}. Jobs in queue: ${module.jobQueue.length()}. Jobs in progress: ${module.jobQueue.running()}. Concurrency: ${
+				)}${module.getStatus()}. Jobs in queue: ${module.jobQueue.lengthQueue()}. Jobs in progress: ${module.jobQueue.lengthRunning()}. Concurrency: ${
 					module.jobQueue.concurrency
 				}. Stage: ${module.getStage()}`
 			);
-		}
+		});
 		// moduleManager._lockdown();
 	}
 	if (command.startsWith("running")) {
 		const parts = command.split(" ");
 
-		console.log(moduleManager.modules[parts[1]].runningJobs);
+		console.log(moduleManager.modules[parts[1]].jobQueue.runningTasks);
+	}
+	if (command.startsWith("queued")) {
+		const parts = command.split(" ");
+
+		console.log(moduleManager.modules[parts[1]].jobQueue.queue);
+	}
+	if (command.startsWith("paused")) {
+		const parts = command.split(" ");
+
+		console.log(moduleManager.modules[parts[1]].jobQueue.pausedTasks);
 	}
 	if (command.startsWith("stats")) {
 		const parts = command.split(" ");

+ 2 - 2
backend/logic/playlists.js

@@ -54,7 +54,7 @@ class _PlaylistsModule extends CoreClass {
 						return async.each(
 							playlistIds,
 							(playlistId, next) => {
-								playlistModel.findOne({ _id: playlistId }, (err, playlist) => {
+								PlaylistsModule.playlistModel.findOne({ _id: playlistId }, (err, playlist) => {
 									if (err) next(err);
 									else if (!playlist) {
 										CacheModule.runJob("HDEL", {
@@ -72,7 +72,7 @@ class _PlaylistsModule extends CoreClass {
 
 					next => {
 						this.setStage(5);
-						playlistModel.find({}, next);
+						PlaylistsModule.playlistModel.find({}, next);
 					},
 
 					(playlists, next) => {