Просмотр исходного кода

refactor: Split runJob into queueJob, runJob and executeJob

Owen Diffey 2 лет назад
Родитель
Сommit
13256ff9b1

+ 22 - 7
backend/src/Job.ts

@@ -11,7 +11,7 @@ export enum JobStatus {
 	ACTIVE = "ACTIVE",
 	PAUSED = "PAUSED",
 	COMPLETED = "COMPLETED"
-};
+}
 
 export default class Job {
 	protected name: string;
@@ -39,7 +39,7 @@ export default class Job {
 
 	protected status: JobStatus;
 
-	protected startedAt: number;
+	protected createdAt: number;
 
 	protected logBook: LogBook;
 
@@ -103,7 +103,7 @@ export default class Job {
 			}
 		);
 		this.setStatus(JobStatus.QUEUED);
-		this.startedAt = Date.now();
+		this.createdAt = Date.now();
 	}
 
 	/**
@@ -167,6 +167,7 @@ export default class Job {
 	 */
 	public async execute() {
 		this.setStatus(JobStatus.ACTIVE);
+		const startedAt = Date.now();
 		return (
 			this.jobFunction
 				.apply(this.module, [this.context, this.payload])
@@ -196,7 +197,7 @@ export default class Job {
 					this.jobStatistics.updateStats(
 						this.getName(),
 						"averageTime",
-						Date.now() - this.startedAt
+						Date.now() - startedAt
 					);
 					this.setStatus(JobStatus.COMPLETED);
 				})
@@ -221,11 +222,25 @@ export default class Job {
 			type,
 			category: this.getName(),
 			data: {
-				moduleName: this.module.getName(),
-				jobName: this.name,
-				jobUuid: this.uuid,
+				...this.toJSON(),
 				...data
 			}
 		});
 	}
+
+	/**
+	 * Serialize job info
+	 *
+	 * @returns json
+	 */
+	public toJSON() {
+		return {
+			uuid: this.getUuid(),
+			priority: this.getPriority(),
+			name: this.getName(),
+			status: this.getStatus(),
+			moduleStatus: this.module.getStatus(),
+			createdAt: this.createdAt
+		};
+	}
 }

+ 10 - 8
backend/src/JobContext.ts

@@ -6,9 +6,9 @@ import { JobOptions } from "./types/JobOptions";
 import { Jobs, Modules } from "./types/Modules";
 
 export default class JobContext {
-	protected job: Job;
+	public readonly job: Job;
 
-	protected jobQueue: JobQueue;
+	public readonly jobQueue: JobQueue;
 
 	public constructor(job: Job) {
 		this.job = job;
@@ -25,13 +25,13 @@ export default class JobContext {
 	}
 
 	/**
-	 * runJob - Run a job
+	 * executeJob - Execute a job
 	 *
 	 * @param moduleName - Module name
 	 * @param jobName - Job name
 	 * @param params - Params
 	 */
-	public async runJob<
+	public async executeJob<
 		ModuleNameType extends keyof Jobs & keyof Modules,
 		JobNameType extends keyof Jobs[ModuleNameType] &
 			keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
@@ -49,9 +49,11 @@ export default class JobContext {
 		payload: PayloadType,
 		options?: JobOptions
 	): Promise<ReturnType> {
-		return this.jobQueue.runJob(moduleName, jobName, payload, {
-			runDirectly: true,
-			...(options ?? {})
-		});
+		return new Job(
+			jobName.toString(),
+			moduleName,
+			payload,
+			options
+		).execute();
 	}
 }

+ 53 - 29
backend/src/JobQueue.ts

@@ -21,7 +21,7 @@ export default class JobQueue {
 	private callbacks: Record<
 		string,
 		{
-			resolve: (value: unknown) => void;
+			resolve: (value: any) => void;
 			reject: (reason?: any) => void;
 		}
 	>;
@@ -91,21 +91,55 @@ export default class JobQueue {
 		payload: PayloadType,
 		options?: JobOptions
 	): Promise<ReturnType> {
+		return new Promise<ReturnType>((resolve, reject) => {
+			this.queueJob(
+				moduleName,
+				jobName,
+				payload,
+				{ resolve, reject },
+				options
+			);
+		});
+	}
+
+	/**
+	 * queueJob - Queue a job
+	 *
+	 * @param moduleName - Module name
+	 * @param jobName - Job name
+	 * @param params - Params
+	 */
+	public async queueJob<
+		ModuleNameType extends keyof Jobs & keyof Modules,
+		JobNameType extends keyof Jobs[ModuleNameType] &
+			keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
+		PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
+				? Record<string, never>
+				: Jobs[ModuleNameType][JobNameType]["payload"]
+			: Record<string, never>,
+		ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
+			? Jobs[ModuleNameType][JobNameType]["returns"]
+			: never
+	>(
+		moduleName: ModuleNameType,
+		jobName: JobNameType,
+		payload: PayloadType,
+		callback: {
+			resolve: (value: ReturnType) => void;
+			reject: (reason?: any) => void;
+		},
+		options?: JobOptions
+	): Promise<string> {
 		const job = new Job(jobName.toString(), moduleName, payload, options);
 
-		const runDirectly = !!(options && options.runDirectly);
+		this.callbacks[job.getUuid()] = callback;
 
 		this.jobs.push(job);
-		if (runDirectly) {
-			return job.execute();
-		}
-		return new Promise((resolve, reject) => {
-			this.callbacks[job.getUuid()] = { resolve, reject };
-			this.queue.push(job);
-			this.process();
-		}).finally(() => {
-			delete this.callbacks[job.getUuid()];
-		}) as Promise<ReturnType>;
+		this.queue.push(job);
+		this.process();
+
+		return job.getUuid();
 	}
 
 	/**
@@ -149,6 +183,8 @@ export default class JobQueue {
 				.then(callback.resolve)
 				.catch(callback.reject)
 				.finally(() => {
+					delete this.callbacks[job.getUuid()];
+
 					// If the current job is in the active jobs array, remove it, and then run the process function to run another job
 					const activeJobIndex = this.active.indexOf(job);
 					if (activeJobIndex > -1) {
@@ -185,23 +221,11 @@ export default class JobQueue {
 	 * @returns Job queue statistics
 	 */
 	public getQueueStatus(type?: JobStatus) {
-		const status: Record<
-			string,
-			{
-				uuid: string;
-				priority: number;
-				name: string;
-				status: JobStatus;
-			}[]
-		> = {};
-		const format = (job: Job) => ({
-			uuid: job.getUuid(),
-			priority: job.getPriority(),
-			name: job.getName(),
-			status: job.getStatus()
-		});
-		if (!type || type === "ACTIVE") status.active = this.active.map(format);
-		if (!type || type === "QUEUED") status.queue = this.queue.map(format);
+		const status: Record<string, ReturnType<Job["toJSON"]>[]> = {};
+		if (!type || type === JobStatus.ACTIVE)
+			status.active = this.active.map(job => job.toJSON());
+		if (!type || type === JobStatus.QUEUED)
+			status.queue = this.queue.map(job => job.toJSON());
 		return status;
 	}
 

+ 1 - 9
backend/src/main.ts

@@ -298,15 +298,7 @@ const runCommand = (line: string) => {
 
 				if (!job) console.log("Job not found");
 				else {
-					const jobInfo = {
-						jobId: job?.getUuid(),
-						jobName: job?.getName(),
-						jobStatus: job?.getStatus(),
-						jobPriority: job?.getPriority(),
-						moduleName: job?.getModule().getName(),
-						moduleStatus: job?.getModule().getStatus()
-					};
-					console.table(jobInfo);
+					console.table(job.toJSON());
 				}
 			}
 			break;

+ 5 - 3
backend/src/modules/StationModule.ts

@@ -37,14 +37,16 @@ export default class StationModule extends BaseModule {
 
 	public async addA(context: JobContext) {
 		context.log("ADDA");
-		await context.runJob("stations", "addB", {}, { priority: 5 });
+		await context.jobQueue.runJob("stations", "addB", {}, { priority: 5 });
 		return { number: 123 };
 	}
 
 	public async addB(context: JobContext) {
 		context.log("ADDB");
-		await context.runJob("stations", "addToQueue", { songId: "test" });
-		await context.runJob("stations", "addC", {});
+		await context.jobQueue.runJob("stations", "addToQueue", {
+			songId: "test"
+		});
+		await context.jobQueue.runJob("stations", "addC", {});
 	}
 
 	public async addC(context: JobContext) {

+ 0 - 1
backend/src/types/JobOptions.ts

@@ -1,5 +1,4 @@
 export type JobOptions = {
 	priority?: number;
-	runDirectly?: boolean;
 	longJob?: string;
 };