Browse Source

feat: worked on new module/job system with runJob, job context and more

Kristian Vos 2 years ago
parent
commit
4b9d71ee92

+ 6 - 2
.vscode/settings.json

@@ -1,3 +1,7 @@
 {
-    "editor.defaultFormatter": "rvest.vs-code-prettier-eslint"
-}
+	"editor.defaultFormatter": "rvest.vs-code-prettier-eslint",
+	"editor.formatOnSave": true,
+	"[typescript]": {
+		"editor.defaultFormatter": "esbenp.prettier-vscode"
+	}
+}

+ 22 - 0
.vscode/tasks.json

@@ -0,0 +1,22 @@
+{
+    // See https://go.microsoft.com/fwlink/?LinkId=733558
+    // for the documentation about the tasks.json format
+    "version": "2.0.0",
+    "tasks": [
+        {
+            "label": "musare.sh update",
+            "type": "shell",
+            "command": "./musare.sh update"
+        },
+        {
+            "label": "musare.sh start",
+            "type": "shell",
+            "command": "./musare.sh start"
+        },
+        {
+            "label": "musare.sh attach backend",
+            "type": "shell",
+            "command": "konsole . -e ./musare.sh attach backend & disown"
+        }
+    ]
+}

+ 2 - 1
backend/.eslintrc

@@ -45,6 +45,7 @@
 		"tsdoc/syntax": "warn",
 		"@typescript-eslint/no-empty-function": 0,
 		"@typescript-eslint/no-this-alias": 0,
-		"@typescript-eslint/no-non-null-assertion": 0
+		"@typescript-eslint/no-non-null-assertion": 0,
+		"no-void": 0
 	}
 }

+ 14 - 4
backend/src/Job.ts

@@ -1,9 +1,10 @@
 import { JobStatus } from "./types/JobStatus";
+import { Module } from "./types/Modules";
 
 export default class Job {
 	protected name: string;
 
-	protected module: string;
+	protected module: Module;
 
 	protected callback: (
 		job: this,
@@ -38,7 +39,7 @@ export default class Job {
 	 */
 	public constructor(
 		name: string,
-		module: string,
+		module: Module,
 		callback: (job: Job, resolve: () => void, reject: () => void) => void,
 		options?: { priority: number; longJob?: string }
 	) {
@@ -70,12 +71,12 @@ export default class Job {
 	}
 
 	/**
-	 * getName - Get job name
+	 * getName - Get module and job name in a dot format, e.g. module.jobName
 	 *
 	 * @returns module.name
 	 */
 	public getName(): string {
-		return `${this.module}.${this.name}`;
+		return `${this.module.getName()}.${this.name}`;
 	}
 
 	/**
@@ -114,6 +115,15 @@ export default class Job {
 		this.status = status;
 	}
 
+	/**
+	 * getModule - Get module
+	 *
+	 * @returns module
+	 */
+	public getModule(): Module {
+		return this.module;
+	}
+
 	/**
 	 * execute - Execute job
 	 *

+ 89 - 0
backend/src/JobContext.ts

@@ -0,0 +1,89 @@
+import { Jobs, Modules } from "./types/Modules";
+
+import Job from "./Job";
+import LogBook from "./LogBook";
+import ModuleManager from "./ModuleManager";
+import BaseModule from "./BaseModule";
+import { JobOptions } from "./types/JobOptions";
+
+export default class JobContext {
+	protected moduleManager: ModuleManager;
+
+	protected logBook: LogBook;
+
+	protected job: Job;
+
+	public constructor(
+		moduleManager: ModuleManager,
+		logBook: LogBook,
+		job: Job
+	) {
+		this.moduleManager = moduleManager;
+		this.logBook = logBook;
+		this.job = job;
+	}
+
+	/**
+	 * Log a message in the context of the current job, which automatically sets the category and data
+	 *
+	 * @param {string} message
+	 * @memberof JobContext
+	 */
+	public log(message: string) {
+		this.moduleManager.logBook.log({
+			message,
+			category: `${this.job.getModule().getName()}.${this.job.getName()}`,
+			data: {
+				moduleName: this.job.getModule().getName(),
+				jobName: this.job.getName()
+			}
+		});
+	}
+
+	/**
+	 * Runs a job in the context of an existing job, which by default runs jobs right away
+	 *
+	 * @template ModuleNameType name of the module, which must exist
+	 * @template JobNameType name of the job, which must exist
+	 * @template PayloadType payload type based on the module and job, which is void if there is no payload
+	 * @template ReturnType return type of the Promise, based on the module and job
+	 * @param {ModuleNameType} moduleName
+	 * @param {JobNameType} jobName
+	 * @param {PayloadType} payload
+	 * @param {JobOptions} [options]
+	 * @return {*}  {Promise<ReturnType>}
+	 * @memberof JobContext
+	 */
+	public runJob<
+		ModuleNameType extends keyof Jobs & keyof Modules,
+		JobNameType extends keyof Jobs[ModuleNameType] &
+			keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
+		PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
+				? void
+				: Jobs[ModuleNameType][JobNameType]["payload"]
+			: void,
+		ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["returns"]
+			: never
+	>(
+		moduleName: ModuleNameType,
+		jobName: JobNameType,
+		payload: PayloadType,
+		options?: JobOptions
+	): Promise<ReturnType> {
+		// If options doesn't exist, create it
+		const newOptions = options ?? {};
+		// If runDirectly is not set, set it to true
+		if (!Object.hasOwn(newOptions, "runDirectly"))
+			newOptions.runDirectly = true;
+
+		// Ask module manager to run the provided job
+		return this.moduleManager.runJob(
+			moduleName,
+			jobName,
+			payload,
+			newOptions
+		);
+	}
+}

+ 64 - 21
backend/src/JobQueue.ts

@@ -21,15 +21,18 @@ export default class JobQueue {
 		}
 	>;
 
+	private processLock: boolean;
+
 	/**
 	 * Job Queue
 	 */
 	public constructor() {
-		this.concurrency = 10;
+		this.concurrency = 1;
 		this.isPaused = true;
 		this.queue = [];
 		this.active = [];
 		this.stats = {};
+		this.processLock = false;
 	}
 
 	/**
@@ -75,23 +78,13 @@ export default class JobQueue {
 	}
 
 	/**
-	 * process - Process queue
+	 * Actually run a job function
+	 *
+	 * @param {Job} job
+	 * @memberof JobQueue
 	 */
-	private process(): void {
-		if (
-			this.isPaused ||
-			this.active.length >= this.concurrency ||
-			this.queue.length === 0
-		)
-			return;
-
-		const job = this.queue.reduce((a, b) =>
-			a.getPriority() <= b.getPriority() ? a : b
-		);
-		if (job.getPriority() === -1) return;
-
-		this.queue.splice(this.queue.indexOf(job), 1);
-		this.active.push(job);
+	public runJob(job: Job) {
+		// Record when we started a job
 		const startTime = Date.now();
 
 		job.execute()
@@ -108,13 +101,63 @@ export default class JobQueue {
 					"averageTime",
 					Date.now() - startTime
 				);
-				this.active.splice(this.active.indexOf(job), 1);
-				setTimeout(() => {
-					this.process();
-				}, 0);
+
+				// If the current job is in the active jobs array, remove it, and then run the process function to run another job
+				const activeJobIndex = this.active.indexOf(job);
+				if (activeJobIndex > -1) {
+					this.active.splice(activeJobIndex, 1);
+					setTimeout(() => {
+						this.process();
+					}, 0);
+				}
 			});
 	}
 
+	/**
+	 * process - Process queue
+	 */
+	private process(): void {
+		// If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
+		if (this.processLock) return;
+		// If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
+		if (
+			this.isPaused ||
+			this.active.length >= this.concurrency ||
+			this.queue.length === 0
+		)
+			return;
+
+		// Lock the process function
+		this.processLock = true;
+
+		// Sort jobs based on priority, with a lower priority being preferred
+		const jobs = this.queue.sort(
+			(a, b) => a.getPriority() - b.getPriority()
+		);
+
+		// Loop through all jobs
+		for (let i = 0; i < jobs.length; i += 1) {
+			const job = jobs[i];
+
+			// If the module of the job is not started, we can't run the job, so go to the next job in the queue
+			// eslint-disable-next-line no-continue
+			if (job.getModule().getStatus() !== "STARTED") continue;
+
+			// Remove the job from the queue and add it to the active jobs array
+			this.queue.splice(this.queue.indexOf(job), 1);
+			this.active.push(job);
+
+			// Run the job
+			this.runJob(job);
+
+			// Stop the for loop
+			break;
+		}
+
+		// Unlock the process after the for loop is finished, so it can be run again
+		this.processLock = false;
+	}
+
 	/**
 	 * getStatus - Get status of job queue
 	 *

+ 8 - 8
backend/src/LogBook.ts

@@ -46,13 +46,13 @@ export default class LogBook {
 				data: false,
 				color: true,
 				exclude: [
-					{
-						category: "jobs",
-						type: "success"
-					},
-					{
-						type: "debug"
-					}
+					// {
+					// 	category: "jobs",
+					// 	type: "success"
+					// },
+					// {
+					// 	type: "debug"
+					// }
 				]
 			},
 			file: {
@@ -157,7 +157,7 @@ export default class LogBook {
 					break;
 			}
 		if (this.outputs[destination].timestamp)
-			message += `| ${log.timestamp} `;
+			message += `| ${new Date(log.timestamp).toISOString()} `;
 		if (this.outputs[destination].title)
 			message += centerString(title ? title.substring(0, 20) : "", 24);
 		if (this.outputs[destination].type)

+ 64 - 50
backend/src/ModuleManager.ts

@@ -1,8 +1,10 @@
 import async from "async";
 import BaseModule from "./BaseModule";
 import Job from "./Job";
+import JobContext from "./JobContext";
 import JobQueue from "./JobQueue";
 import LogBook from "./LogBook";
+import { JobOptions } from "./types/JobOptions";
 import { Jobs, Modules, ModuleStatus, ModuleClass } from "./types/Modules";
 
 export default class ModuleManager {
@@ -156,19 +158,24 @@ export default class ModuleManager {
 	 * @param params - Params
 	 */
 	public runJob<
-		M extends keyof Jobs & keyof Modules,
-		J extends keyof Jobs[M] & keyof Omit<Modules[M], keyof BaseModule>,
-		P extends "payload" extends keyof Jobs[M][J]
-			? Jobs[M][J]["payload"]
-			: undefined,
-		R = "returns" extends keyof Jobs[M][J] ? Jobs[M][J]["returns"] : never
+		ModuleNameType extends keyof Jobs & keyof Modules,
+		JobNameType extends keyof Jobs[ModuleNameType] &
+			keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
+		PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
+				? void
+				: Jobs[ModuleNameType][JobNameType]["payload"]
+			: void,
+		ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["returns"]
+			: never
 	>(
-		moduleName: M,
-		jobName: J,
-		...params: P extends undefined ? [] : [P, { priority?: number }?]
-	): Promise<R> {
-		const [payload, options] = params;
-		return new Promise<R>((resolve, reject) => {
+		moduleName: ModuleNameType,
+		jobName: JobNameType,
+		payload: PayloadType,
+		options?: JobOptions
+	): Promise<ReturnType> {
+		return new Promise<ReturnType>((resolve, reject) => {
 			const module = this.modules && this.modules[moduleName];
 			if (!module) reject(new Error("Module not found."));
 			else {
@@ -180,46 +187,53 @@ export default class ModuleManager {
 				)
 					reject(new Error("Illegal job function."));
 				else {
-					this.jobQueue.add(
-						new Job(
-							jobName.toString(),
-							moduleName,
-							(job, resolveJob, rejectJob) => {
-								jobFunction
-									.apply(module, [payload])
-									.then((response: R) => {
-										this.logBook.log({
-											message:
-												"Job completed successfully",
-											type: "success",
-											category: "jobs",
-											data: {
-												jobName: job.getName(),
-												jobId: job.getUuid()
-											}
-										});
-										resolveJob();
-										resolve(response);
-									})
-									.catch((err: any) => {
-										this.logBook.log({
-											message: `Job failed with error "${err}"`,
-											type: "error",
-											category: "jobs",
-											data: {
-												jobName: job.getName(),
-												jobId: job.getUuid()
-											}
-										});
-										rejectJob();
-										reject(err);
+					const job = new Job(
+						jobName.toString(),
+						module,
+						(job, resolveJob, rejectJob) => {
+							const jobContext = new JobContext(
+								this,
+								this.logBook,
+								job
+							);
+							jobFunction
+								.apply(jobContext, [payload])
+								.then((response: ReturnType) => {
+									this.logBook.log({
+										message: "Job completed successfully",
+										type: "success",
+										category: "jobs",
+										data: {
+											jobName: job.getName(),
+											jobId: job.getUuid()
+										}
 									});
-							},
-							{
-								priority: (options && options.priority) || 10
-							}
-						)
+									resolveJob();
+									resolve(response);
+								})
+								.catch((err: any) => {
+									this.logBook.log({
+										message: `Job failed with error "${err}"`,
+										type: "error",
+										category: "jobs",
+										data: {
+											jobName: job.getName(),
+											jobId: job.getUuid()
+										}
+									});
+									rejectJob();
+									reject(err);
+								});
+						},
+						{
+							priority: (options && options.priority) || 10
+						}
 					);
+
+					// If a job options.runDirectly is set to true, skip the queue and run a job directly
+					if (options && options.runDirectly)
+						this.jobQueue.runJob(job);
+					else this.jobQueue.add(job);
 				}
 			}
 		});

+ 38 - 12
backend/src/main.ts

@@ -10,20 +10,27 @@ moduleManager.startup();
 // eslint-disable-next-line
 // @ts-ignore
 global.moduleManager = moduleManager;
+// eslint-disable-next-line
+// @ts-ignore
+global.rs = () => {
+	process.exit();
+};
 
-// const interval = setInterval(() => {
-// 	moduleManager
-// 		.runJob("stations", "addToQueue", { songId: "TestId" })
-// 		.catch(() => {});
-// 	moduleManager.runJob("stations", "addA").catch(() => {});
-// 	moduleManager
-// 		.runJob("others", "doThing", { test: "Test", test2: 123 })
-// 		.catch(() => {});
-// }, 40);
+const interval = setInterval(() => {
+	moduleManager
+		.runJob("stations", "addToQueue", { songId: "TestId" })
+		.catch(() => {});
+	moduleManager
+		.runJob("stations", "addA", void 0, { priority: 5 })
+		.catch(() => {});
+	moduleManager
+		.runJob("others", "doThing", { test: "Test", test2: 123 })
+		.catch(() => {});
+}, 40);
 
-// setTimeout(() => {
-// 	clearTimeout(interval);
-// }, 20000);
+setTimeout(() => {
+	clearTimeout(interval);
+}, 3000);
 
 process.on("uncaughtException", err => {
 	if (err.name === "ECONNREFUSED" || err.name === "UNCERTAIN_STATE") return;
@@ -31,6 +38,14 @@ process.on("uncaughtException", err => {
 	console.log(`UNCAUGHT EXCEPTION: ${err.stack}`);
 });
 
+const shutdown = async () => {
+	await moduleManager.shutdown().catch(() => process.exit(1));
+	process.exit(0);
+};
+process.on("SIGINT", shutdown);
+process.on("SIGQUIT", shutdown);
+process.on("SIGTERM", shutdown);
+
 // const shutdown = () => {
 // 	moduleManager
 // 		.shutdown()
@@ -45,6 +60,17 @@ process.on("uncaughtException", err => {
 const runCommand = (line: string) => {
 	const [command, ...args] = line.split(" ");
 	switch (command) {
+		case "help": {
+			console.log("Commands:");
+			console.log("status");
+			console.log("stats");
+			console.log("queue");
+			console.log("active");
+			console.log("eval");
+			console.log("debug");
+			console.log("log");
+			break;
+		}
 		case "status": {
 			console.log("Module Manager Status:");
 			console.table(moduleManager.getStatus());

+ 19 - 3
backend/src/modules/StationModule.ts

@@ -1,3 +1,4 @@
+import JobContext from "src/JobContext";
 import { UniqueMethods } from "../types/Modules";
 import BaseModule from "../BaseModule";
 import ModuleManager from "../ModuleManager";
@@ -44,14 +45,29 @@ export default class StationModule extends BaseModule {
 		});
 	}
 
-	public addA(): Promise<{ number: number }> {
+	public addA(this: JobContext): Promise<{ number: number }> {
 		return new Promise<{ number: number }>(resolve => {
-			resolve({ number: 123 });
+			this.log("ADDA");
+			this.runJob("stations", "addB", void 0, { priority: 5 }).then(
+				() => {
+					resolve({ number: 123 });
+				}
+			);
+		});
+	}
+
+	public addB(this: JobContext): Promise<void> {
+		return new Promise<void>(resolve => {
+			this.log("ADDB");
+			this.runJob("stations", "addC", void 0).then(() => {
+				resolve();
+			});
 		});
 	}
 
-	public addB(): Promise<void> {
+	public addC(this: JobContext): Promise<void> {
 		return new Promise<void>(resolve => {
+			this.log("ADDC");
 			resolve();
 		});
 	}

+ 4 - 0
backend/src/types/JobOptions.ts

@@ -0,0 +1,4 @@
+export type JobOptions = {
+	priority?: number;
+	runDirectly?: boolean;
+};

+ 2 - 0
backend/src/types/Modules.ts

@@ -4,6 +4,8 @@ import StationModule, { StationModuleJobs } from "../modules/StationModule";
 import ModuleManager from "../ModuleManager";
 import BaseModule from "../BaseModule";
 
+export type Module = BaseModule;
+
 export type ModuleClass<Module extends typeof BaseModule> = {
 	new (moduleManager: ModuleManager): Module;
 };