Przeglądaj źródła

refactor: worked on nesting job queues in jobs for child jobs

Kristian Vos 2 lat temu
rodzic
commit
d2781bdbae

+ 1 - 1
backend/src/BaseModule.ts

@@ -100,7 +100,7 @@ export default abstract class BaseModule {
 		this.moduleManager.logBook.log({
 			message,
 			type,
-			category: `modules`,
+			category: `modules.${this.getName()}`,
 			data: {
 				moduleName: this.name,
 				...data

+ 57 - 2
backend/src/Job.ts

@@ -1,5 +1,10 @@
+import BaseModule from "./BaseModule";
+import JobQueue from "./JobQueue";
+import LogBook from "./LogBook";
+import ModuleManager from "./ModuleManager";
+import { JobOptions } from "./types/JobOptions";
 import { JobStatus } from "./types/JobStatus";
-import { Module } from "./types/Modules";
+import { Jobs, Module, Modules } from "./types/Modules";
 
 export default class Job {
 	protected name: string;
@@ -29,6 +34,12 @@ export default class Job {
 
 	protected startedAt: number;
 
+	protected moduleManager: ModuleManager;
+
+	protected logBook: LogBook;
+
+	protected jobQueue: JobQueue;
+
 	/**
 	 * Job
 	 *
@@ -41,13 +52,20 @@ export default class Job {
 		name: string,
 		module: Module,
 		callback: (job: Job, resolve: () => void, reject: () => void) => void,
-		options?: { priority: number; longJob?: string }
+		options: { priority: number; longJob?: string },
+		moduleManager: ModuleManager,
+		logBook: LogBook
 	) {
 		this.name = name;
 		this.module = module;
 		this.callback = callback;
 		this.priority = 1;
 
+		this.moduleManager = moduleManager;
+		this.logBook = logBook;
+
+		this.jobQueue = new JobQueue(moduleManager, logBook);
+
 		if (options) {
 			const { priority, longJob } = options;
 			if (priority) this.priority = priority;
@@ -113,6 +131,8 @@ export default class Job {
 	 */
 	public setStatus(status: JobStatus) {
 		this.status = status;
+		if (this.status === "ACTIVE") this.jobQueue.resume();
+		else if (this.status === "PAUSED") this.jobQueue.pause();
 	}
 
 	/**
@@ -124,6 +144,13 @@ export default class Job {
 		return this.module;
 	}
 
+	/**
+	 * Gets the job queue for jobs running under this current job
+	 */
+	public getJobQueue() {
+		return this.jobQueue;
+	}
+
 	/**
 	 * execute - Execute job
 	 *
@@ -135,4 +162,32 @@ export default class Job {
 			this.callback(this, resolve, reject);
 		});
 	}
+
+	/**
+	 * runJob - Run a job
+	 *
+	 * @param moduleName - Module name
+	 * @param jobName - Job name
+	 * @param params - Params
+	 */
+	public runJob<
+		ModuleNameType extends keyof Jobs & keyof Modules,
+		JobNameType extends keyof Jobs[ModuleNameType] &
+			keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
+		PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
+				? Record<string, never>
+				: Jobs[ModuleNameType][JobNameType]["payload"]
+			: Record<string, never>,
+		ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["returns"]
+			: never
+	>(
+		moduleName: ModuleNameType,
+		jobName: JobNameType,
+		payload: PayloadType,
+		options?: JobOptions
+	): Promise<ReturnType> {
+		return this.jobQueue.runJob(moduleName, jobName, payload, options);
+	}
 }

+ 8 - 21
backend/src/JobContext.ts

@@ -49,15 +49,11 @@ export default class JobContext {
 	}
 
 	/**
-	 * Runs a job in the context of an existing job, which by default runs jobs right away
+	 * runJob - Run a job
 	 *
-	 * @typeParam ModuleNameType - name of the module, which must exist
-	 * @typeParam JobNameType - name of the job, which must exist
-	 * @typeParam PayloadType - payload type based on the module and job, which is void if there is no payload
 	 * @param moduleName - Module name
 	 * @param jobName - Job name
-	 * @param payload - Job payload, if none then void
-	 * @param options - Job options
+	 * @param params - Params
 	 */
 	public runJob<
 		ModuleNameType extends keyof Jobs & keyof Modules,
@@ -67,25 +63,16 @@ export default class JobContext {
 			? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
 				? Record<string, never>
 				: Jobs[ModuleNameType][JobNameType]["payload"]
-			: Record<string, never>
+			: Record<string, never>,
+		ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["returns"]
+			: never
 	>(
 		moduleName: ModuleNameType,
 		jobName: JobNameType,
 		payload: PayloadType,
 		options?: JobOptions
-	) {
-		// If options doesn't exist, create it
-		const newOptions = options ?? {};
-		// If runDirectly is not set, set it to true
-		if (!Object.hasOwn(newOptions, "runDirectly"))
-			newOptions.runDirectly = true;
-
-		// Ask module manager to run the provided job
-		return this.moduleManager.runJob(
-			moduleName,
-			jobName,
-			payload,
-			newOptions
-		);
+	): Promise<ReturnType> {
+		return this.job.runJob(moduleName, jobName, payload, options);
 	}
 }

+ 140 - 15
backend/src/JobQueue.ts

@@ -1,11 +1,19 @@
+import BaseModule from "./BaseModule";
 import Job from "./Job";
+import JobContext from "./JobContext";
+import LogBook from "./LogBook";
+import ModuleManager from "./ModuleManager";
+import { JobOptions } from "./types/JobOptions";
 import { JobStatus } from "./types/JobStatus";
+import { Jobs, Modules } from "./types/Modules";
 
 export default class JobQueue {
 	private concurrency: number;
 
 	private isPaused: boolean;
 
+	private jobs: Job[];
+
 	private queue: Job[];
 
 	private active: Job[];
@@ -23,16 +31,23 @@ export default class JobQueue {
 
 	private processLock: boolean;
 
+	private moduleManager: ModuleManager;
+
+	private logBook: LogBook;
+
 	/**
 	 * Job Queue
 	 */
-	public constructor() {
+	public constructor(moduleManager: ModuleManager, logBook: LogBook) {
 		this.concurrency = 1;
 		this.isPaused = true;
+		this.jobs = [];
 		this.queue = [];
 		this.active = [];
 		this.stats = {};
 		this.processLock = false;
+		this.moduleManager = moduleManager;
+		this.logBook = logBook;
 	}
 
 	/**
@@ -40,12 +55,17 @@ export default class JobQueue {
 	 *
 	 * @param job - Job
 	 */
-	public add(job: Job) {
-		this.queue.push(job);
+	public add(job: Job, runDirectly: boolean) {
 		this.updateStats(job.getName(), "added");
-		setTimeout(() => {
-			this.process();
-		}, 0);
+		this.jobs.push(job);
+		if (runDirectly) {
+			this.executeJob(job);
+		} else {
+			this.queue.push(job);
+			setTimeout(() => {
+				this.process();
+			}, 0);
+		}
 	}
 
 	/**
@@ -54,11 +74,16 @@ export default class JobQueue {
 	 * @param jobId - Job UUID
 	 * @returns Job if found
 	 */
-	public getJob(jobId: string) {
-		return (
-			this.queue.find(job => job.getUuid() === jobId) ||
-			this.active.find(job => job.getUuid() === jobId)
-		);
+	public getJob(jobId: string, recursive = false) {
+		let job = this.jobs.find(job => job.getUuid() === jobId);
+		if (job || !recursive) return job;
+
+		this.jobs.some(currentJob => {
+			job = currentJob.getJobQueue().getJob(jobId, recursive);
+			return !!job;
+		});
+
+		return job;
 	}
 
 	/**
@@ -78,14 +103,107 @@ export default class JobQueue {
 		this.process();
 	}
 
+	/**
+	 * runJob - Run a job
+	 *
+	 * @param moduleName - Module name
+	 * @param jobName - Job name
+	 * @param params - Params
+	 */
+	public runJob<
+		ModuleNameType extends keyof Jobs & keyof Modules,
+		JobNameType extends keyof Jobs[ModuleNameType] &
+			keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
+		PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
+				? Record<string, never>
+				: Jobs[ModuleNameType][JobNameType]["payload"]
+			: Record<string, never>,
+		ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["returns"]
+			: never
+	>(
+		moduleName: ModuleNameType,
+		jobName: JobNameType,
+		payload: PayloadType,
+		options?: JobOptions
+	): Promise<ReturnType> {
+		return new Promise((resolve, reject) => {
+			const module = this.moduleManager.getModule(
+				moduleName
+			) as Modules[ModuleNameType];
+			if (!module) reject(new Error("Module not found."));
+			else {
+				const jobFunction = module[jobName];
+				if (!jobFunction || typeof jobFunction !== "function")
+					reject(new Error("Job not found."));
+				else if (
+					Object.prototype.hasOwnProperty.call(BaseModule, jobName)
+				)
+					reject(new Error("Illegal job function."));
+				else {
+					const job = new Job(
+						jobName.toString(),
+						module,
+						(job, resolveJob, rejectJob) => {
+							const jobContext = new JobContext(
+								this.moduleManager,
+								this.logBook,
+								job
+							);
+							jobFunction
+								.apply(module, [jobContext, payload])
+								.then((response: ReturnType) => {
+									this.logBook.log({
+										message: "Job completed successfully",
+										type: "success",
+										category: "jobs",
+										data: {
+											jobName: job.getName(),
+											jobId: job.getUuid()
+										}
+									});
+									resolveJob();
+									resolve(response);
+								})
+								.catch((err: any) => {
+									this.logBook.log({
+										message: `Job failed with error "${err}"`,
+										type: "error",
+										category: "jobs",
+										data: {
+											jobName: job.getName(),
+											jobId: job.getUuid()
+										}
+									});
+									rejectJob();
+									reject(err);
+								});
+						},
+						{
+							priority: (options && options.priority) || 10
+						},
+						this.moduleManager,
+						this.logBook
+					);
+
+					const runDirectly = !!(options && options.runDirectly);
+
+					this.add(job, runDirectly);
+				}
+			}
+		});
+	}
+
 	/**
 	 * Actually run a job function
 	 *
 	 * @param job - Initiated job
 	 */
-	public runJob(job: Job) {
+	public executeJob(job: Job) {
 		// Record when we started a job
 		const startTime = Date.now();
+		this.active.push(job);
 
 		job.execute()
 			.then(() => {
@@ -145,10 +263,9 @@ export default class JobQueue {
 
 			// Remove the job from the queue and add it to the active jobs array
 			this.queue.splice(this.queue.indexOf(job), 1);
-			this.active.push(job);
 
-			// Run the job
-			this.runJob(job);
+			// Execute the job
+			this.executeJob(job);
 
 			// Stop the for loop
 			break;
@@ -226,6 +343,14 @@ export default class JobQueue {
 		return status;
 	}
 
+	/**
+	 * Gets the job array
+	 *
+	 */
+	public getJobs() {
+		return this.jobs;
+	}
+
 	/**
 	 * updateStats - Update job statistics
 	 *

+ 9 - 9
backend/src/LogBook.ts

@@ -55,15 +55,15 @@ export default class LogBook {
 				data: false,
 				color: true,
 				exclude: [
-					// Success messages for jobs don't tend to be very helpful, so we exclude them by default
-					{
-						category: "jobs",
-						type: "success"
-					},
-					// We don't want to show debug messages in the console by default
-					{
-						type: "debug"
-					}
+					// // Success messages for jobs don't tend to be very helpful, so we exclude them by default
+					// {
+					// 	category: "jobs",
+					// 	type: "success"
+					// },
+					// // We don't want to show debug messages in the console by default
+					// {
+					// 	type: "debug"
+					// }
 				]
 			},
 			memory: {

+ 26 - 63
backend/src/ModuleManager.ts

@@ -20,7 +20,7 @@ export default class ModuleManager {
 	 */
 	public constructor(logBook: LogBook) {
 		this.logBook = logBook;
-		this.jobQueue = new JobQueue();
+		this.jobQueue = new JobQueue(this, logBook);
 	}
 
 	/**
@@ -63,6 +63,30 @@ export default class ModuleManager {
 		return this.jobQueue.getQueueStatus();
 	}
 
+	/**
+	 * Gets a job from the queue by jobId
+	 *
+	 * @returns Job
+	 */
+	public getJob(jobId: string, recursive = false) {
+		return this.jobQueue.getJob(jobId, recursive);
+	}
+
+	/**
+	 * Gets a list of all jobs running directly in the ModuleManager job queue
+	 */
+	public getJobs() {
+		return this.jobQueue.getJobs();
+	}
+
+	/**
+	 * Gets a module
+	 *
+	 */
+	public getModule(moduleName: keyof Modules) {
+		return this.modules && this.modules[moduleName];
+	}
+
 	/**
 	 * loadModule - Load and initialize module
 	 *
@@ -158,67 +182,6 @@ export default class ModuleManager {
 		payload: PayloadType,
 		options?: JobOptions
 	): Promise<ReturnType> {
-		return new Promise((resolve, reject) => {
-			const module = this.modules && this.modules[moduleName];
-			if (!module) reject(new Error("Module not found."));
-			else {
-				const jobFunction = module[jobName];
-				if (!jobFunction || typeof jobFunction !== "function")
-					reject(new Error("Job not found."));
-				else if (
-					Object.prototype.hasOwnProperty.call(BaseModule, jobName)
-				)
-					reject(new Error("Illegal job function."));
-				else {
-					const job = new Job(
-						jobName.toString(),
-						module,
-						(job, resolveJob, rejectJob) => {
-							const jobContext = new JobContext(
-								this,
-								this.logBook,
-								job
-							);
-							jobFunction
-								.apply(module, [jobContext, payload])
-								.then((response: ReturnType) => {
-									this.logBook.log({
-										message: "Job completed successfully",
-										type: "success",
-										category: "jobs",
-										data: {
-											jobName: job.getName(),
-											jobId: job.getUuid()
-										}
-									});
-									resolveJob();
-									resolve(response);
-								})
-								.catch((err: any) => {
-									this.logBook.log({
-										message: `Job failed with error "${err}"`,
-										type: "error",
-										category: "jobs",
-										data: {
-											jobName: job.getName(),
-											jobId: job.getUuid()
-										}
-									});
-									rejectJob();
-									reject(err);
-								});
-						},
-						{
-							priority: (options && options.priority) || 10
-						}
-					);
-
-					// If a job options.runDirectly is set to true, skip the queue and run a job directly
-					if (options && options.runDirectly)
-						this.jobQueue.runJob(job);
-					else this.jobQueue.add(job);
-				}
-			}
-		});
+		return this.jobQueue.runJob(moduleName, jobName, payload, options);
 	}
 }

+ 112 - 29
backend/src/main.ts

@@ -2,6 +2,7 @@ import * as readline from "node:readline";
 import { ObjectId } from "mongodb";
 import ModuleManager from "./ModuleManager";
 import LogBook from "./LogBook";
+import Job from "./Job";
 
 const logBook = new LogBook();
 
@@ -38,21 +39,21 @@ global.rs = () => {
 	process.exit();
 };
 
-// const interval = setInterval(() => {
-// 	moduleManager
-// 		.runJob("stations", "addToQueue", { songId: "TestId" })
-// 		.catch(() => {});
-// 	moduleManager
-// 		.runJob("stations", "addA", {}, { priority: 5 })
-// 		.catch(() => {});
-// 	moduleManager
-// 		.runJob("others", "doThing", { test: "Test", test2: 123 })
-// 		.catch(() => {});
-// }, 40);
-
-// setTimeout(() => {
-// 	clearTimeout(interval);
-// }, 3000);
+const interval = setInterval(() => {
+	moduleManager
+		.runJob("stations", "addToQueue", { songId: "TestId" })
+		.catch(() => {});
+	moduleManager
+		.runJob("stations", "addA", {}, { priority: 5 })
+		.catch(() => {});
+	// moduleManager
+	// 	.runJob("stations", "", { test: "Test", test2: 123 })
+	// 	.catch(() => {});
+}, 40);
+
+setTimeout(() => {
+	clearTimeout(interval);
+}, 3000);
 
 setTimeout(async () => {
 	const _id = "6371212daf4e9f8fb14444b2";
@@ -196,14 +197,14 @@ setTimeout(async () => {
 	// 	.then(console.log)
 	// 	.catch(console.error);
 
-	moduleManager
-		.runJob("data", "find", {
-			collection: "abc",
-			filter: { _id: new ObjectId(_id) },
-			limit: 1
-		})
-		.then(console.log)
-		.catch(console.error);
+	// moduleManager
+	// 	.runJob("data", "find", {
+	// 		collection: "abc",
+	// 		filter: { _id: new ObjectId(_id) },
+	// 		limit: 1
+	// 	})
+	// 	.then(console.log)
+	// 	.catch(console.error);
 }, 0);
 
 const rl = readline.createInterface({
@@ -236,18 +237,51 @@ process.on("SIGINT", shutdown);
 process.on("SIGQUIT", shutdown);
 process.on("SIGTERM", shutdown);
 
+type JobArray = [Job, JobArray[]];
+
+function getNestedChildJobs(job: Job): JobArray {
+	const jobs = job.getJobQueue().getJobs();
+
+	if (jobs.length > 0)
+		return [
+			job,
+			jobs.map((_job: Job) => getNestedChildJobs(_job))
+		] as JobArray;
+
+	return [job, []];
+}
+
+function getJobLines(
+	level: number,
+	[job, jobArrs]: JobArray,
+	seperator = "\t"
+): string[] {
+	const tabs = Array.from({ length: level })
+		.map(() => seperator)
+		.join("");
+	let lines = [
+		`${tabs}${job.getName()} (${job.getStatus()} - ${job.getPriority()} - ${job.getUuid()})`
+	];
+	jobArrs.forEach((jobArr: JobArray) => {
+		lines = [...lines, ...getJobLines(level + 1, jobArr, seperator)];
+	});
+
+	return lines;
+}
+
 const runCommand = (line: string) => {
 	const [command, ...args] = line.split(" ");
 	switch (command) {
 		case "help": {
 			console.log("Commands:");
-			console.log("status");
-			console.log("stats");
-			console.log("queue");
-			console.log("active");
-			console.log("eval");
+			console.log("status - Show module manager and job queue status");
+			console.log("stats - Shows jobs stats");
+			console.log("queue - Shows a table of all jobs in the queue");
+			console.log("active - Shows a table of all jobs currently running");
+			console.log("jobinfo <jobId> - Print all info about a job");
+			console.log("eval - Run a command");
 			console.log("debug");
-			console.log("log");
+			console.log("log - Change LogBook settings");
 			break;
 		}
 		case "status": {
@@ -281,6 +315,55 @@ const runCommand = (line: string) => {
 			console.table(activeStatus);
 			break;
 		}
+		case "jobinfo": {
+			if (args.length === 0) console.log("Please specify a jobId");
+			else {
+				const jobId = args[0];
+				const job = moduleManager.getJob(jobId, true);
+
+				if (!job) console.log("Job not found");
+				else {
+					const jobInfo = {
+						jobId: job?.getUuid(),
+						jobName: job?.getName(),
+						jobStatus: job?.getStatus(),
+						jobPriority: job?.getPriority(),
+						moduleName: job?.getModule().getName(),
+						moduleStatus: job?.getModule().getStatus()
+					};
+					console.table(jobInfo);
+
+					// Gets all child jobs of the current job, including the current job, nested
+					const jobArrs = getNestedChildJobs(job);
+
+					const jobLines = getJobLines(0, jobArrs);
+
+					jobLines.forEach(jobLine => {
+						console.log(jobLine);
+					});
+				}
+			}
+			break;
+		}
+		case "jobtree": {
+			const jobs = moduleManager.getJobs();
+
+			let jobLines: string[] = [];
+
+			jobs.forEach(job => {
+				// Gets all child jobs of the current job, including the current job, nested
+				const jobArrs = getNestedChildJobs(job);
+
+				jobLines = [...jobLines, ...getJobLines(0, jobArrs)];
+			});
+
+			console.log("List of jobs:");
+			jobLines.forEach(jobLine => {
+				console.log(jobLine);
+			});
+
+			break;
+		}
 		case "eval": {
 			const evalCommand = args.join(" ");
 			console.log(`Running eval command: ${evalCommand}`);

+ 6 - 5
backend/src/modules/StationModule.ts

@@ -29,11 +29,11 @@ export default class StationModule extends BaseModule {
 	 */
 	public async addToQueue(context: JobContext, payload: { songId: string }) {
 		const { songId } = payload;
-		// console.log(`Adding song ${songId} to the queue.`);
-		return new Promise(resolve => {
+		context.log(`Adding song ${songId} to the queue.`);
+		await new Promise((resolve, reject) => {
 			setTimeout(() => {
-				if (Math.round(Math.random())) throw new Error();
-				resolve(true);
+				if (Math.round(Math.random())) reject(new Error("Test321"));
+				else resolve(true);
 			}, Math.random() * 1000);
 		});
 	}
@@ -49,8 +49,9 @@ export default class StationModule extends BaseModule {
 		await context.runJob("stations", "addC", {});
 	}
 
-	public addC(context: JobContext) {
+	public async addC(context: JobContext) {
 		context.log("ADDC");
+		await new Promise(() => {});
 	}
 }