Procházet zdrojové kódy

refactor: Use events for job callbacks

Owen Diffey před 1 rokem
rodič
revize
8c84b17d92

+ 27 - 29
backend/src/Job.ts

@@ -4,6 +4,7 @@ import LogBook, { Log } from "@/LogBook";
 import { JobOptions } from "@/types/JobOptions";
 import WebSocketModule from "./modules/WebSocketModule";
 import BaseModule from "./BaseModule";
+import EventsModule from "./modules/EventsModule";
 
 export enum JobStatus {
 	QUEUED = "QUEUED",
@@ -55,9 +56,20 @@ export default abstract class Job {
 		payload: unknown,
 		options?: JobOptions
 	) {
+		this._createdAt = performance.now();
 		this._module = module;
 		this._payload = payload;
 		this._priority = 1;
+		this._status = JobStatus.QUEUED;
+		/* eslint-disable no-bitwise, eqeqeq */
+		this._uuid = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replace(
+			/[xy]/g,
+			c => {
+				const r = (Math.random() * 16) | 0;
+				const v = c == "x" ? r : (r & 0x3) | 0x8;
+				return v.toString(16);
+			}
+		);
 
 		let contextOptions;
 
@@ -78,17 +90,7 @@ export default abstract class Job {
 
 		this._context = new JobContext(this, contextOptions);
 
-		/* eslint-disable no-bitwise, eqeqeq */
-		this._uuid = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replace(
-			/[xy]/g,
-			c => {
-				const r = (Math.random() * 16) | 0;
-				const v = c == "x" ? r : (r & 0x3) | 0x8;
-				return v.toString(16);
-			}
-		);
-		this._status = JobStatus.QUEUED;
-		this._createdAt = performance.now();
+		JobStatistics.updateStats(this.getPath(), "added");
 	}
 
 	/**
@@ -198,17 +200,15 @@ export default abstract class Job {
 			const data = await this._execute();
 
 			const socketId = this._context.getSocketId();
+			const callbackRef = this._context.getCallbackRef();
 
-			if (socketId && this._context.getCallbackRef()) {
-				await WebSocketModule.dispatch(
+			if (callbackRef) {
+				await EventsModule.publish(`job.${this.getUuid()}`, {
 					socketId,
-					"jobCallback",
-					this._context.getCallbackRef(),
-					{
-						status: "success",
-						data
-					}
-				);
+					callbackRef,
+					status: "success",
+					data
+				});
 			}
 
 			this.log({
@@ -223,17 +223,15 @@ export default abstract class Job {
 			const message = error?.message ?? error;
 
 			const socketId = this._context.getSocketId();
+			const callbackRef = this._context.getCallbackRef();
 
-			if (socketId && this._context.getCallbackRef()) {
-				await WebSocketModule.dispatch(
+			if (callbackRef) {
+				await EventsModule.publish(`job.${this.getUuid()}`, {
 					socketId,
-					"jobCallback",
-					this._context.getCallbackRef(),
-					{
-						status: "error",
-						message
-					}
-				);
+					callbackRef,
+					status: "error",
+					message
+				});
 			}
 
 			this.log({

+ 12 - 49
backend/src/JobQueue.ts

@@ -7,8 +7,6 @@ export class JobQueue {
 
 	private _isPaused: boolean;
 
-	private _jobs: Job[];
-
 	private _queue: Job[];
 
 	private _active: Job[];
@@ -29,23 +27,12 @@ export class JobQueue {
 	public constructor() {
 		this._concurrency = 50;
 		this._isPaused = true;
-		this._jobs = [];
 		this._queue = [];
 		this._active = [];
 		this._callbacks = {};
 		this._processLock = false;
 	}
 
-	/**
-	 * getJob - Fetch job
-	 *
-	 * @param jobId - Job UUID
-	 * @returns Job if found
-	 */
-	public getJob(jobId: string) {
-		return this._jobs.find(job => job.getUuid() === jobId);
-	}
-
 	/**
 	 * pause - Pause queue
 	 *
@@ -67,19 +54,16 @@ export class JobQueue {
 	 * runJob - Run a job
 	 */
 	public async runJob(
-		moduleName: string,
-		jobName: string,
+		JobClass: typeof Job,
 		payload?: unknown,
 		options?: JobOptions
 	): Promise<unknown> {
 		return new Promise<unknown>((resolve, reject) => {
-			this.queueJob(
-				moduleName,
-				jobName,
-				payload,
-				{ resolve, reject },
-				options
-			).catch(reject);
+			this.queueJob(JobClass, payload, options)
+				.then(uuid => {
+					this._callbacks[uuid] = { resolve, reject };
+				})
+				.catch(reject);
 		});
 	}
 
@@ -87,25 +71,12 @@ export class JobQueue {
 	 * queueJob - Queue a job
 	 */
 	public async queueJob(
-		moduleName: string,
-		jobName: string,
-		payload: unknown,
-		callback: {
-			resolve: (value: unknown) => void;
-			reject: (reason?: unknown) => void;
-		},
+		JobClass: typeof Job,
+		payload?: unknown,
 		options?: JobOptions
 	): Promise<string> {
-		const module = ModuleManager.getModule(moduleName);
-		if (!module) throw new Error("Module not found.");
-
-		const JobClass = module.getJob(jobName);
-
 		const job = new JobClass(payload, options);
 
-		this._callbacks[job.getUuid()] = callback;
-
-		this._jobs.push(job);
 		this._queue.push(job);
 		this._process();
 
@@ -149,12 +120,11 @@ export class JobQueue {
 			this._active.push(job);
 
 			const callback = this._callbacks[job.getUuid()];
+
 			job.execute()
-				.then(callback.resolve)
-				.catch(callback.reject)
+				.then(callback?.resolve)
+				.catch(callback?.reject)
 				.finally(() => {
-					delete this._callbacks[job.getUuid()];
-
 					// If the current job is in the active jobs array, remove it, and then run the process function to run another job
 					const activeJobIndex = this._active.indexOf(job);
 					if (activeJobIndex > -1) {
@@ -163,6 +133,7 @@ export class JobQueue {
 
 					this._process();
 				});
+
 			// Stop the for loop
 			if (this._active.length >= this._concurrency) break;
 		}
@@ -199,14 +170,6 @@ export class JobQueue {
 			status.queue = this._queue.map(job => job.toJSON());
 		return status;
 	}
-
-	/**
-	 * Gets the job array
-	 *
-	 */
-	public getJobs() {
-		return this._jobs;
-	}
 }
 
 export default new JobQueue();

+ 0 - 14
backend/src/main.ts

@@ -123,7 +123,6 @@ const runCommand = (line: string) => {
 			console.log("stats - Shows jobs stats");
 			console.log("queue - Shows a table of all jobs in the queue");
 			console.log("active - Shows a table of all jobs currently running");
-			console.log("jobinfo <jobId> - Print all info about a job");
 			console.log("eval - Run a command");
 			console.log("debug");
 			console.log("log - Change LogBook settings");
@@ -160,19 +159,6 @@ const runCommand = (line: string) => {
 			console.table(activeStatus);
 			break;
 		}
-		case "jobinfo": {
-			if (args.length === 0) console.log("Please specify a jobId");
-			else {
-				const jobId = args[0];
-				const job = JobQueue.getJob(jobId);
-
-				if (!job) console.log(`Job "${jobId}" not found`);
-				else {
-					console.table(job.toJSON());
-				}
-			}
-			break;
-		}
 		case "eval": {
 			const evalCommand = args.join(" ");
 			console.log(`Running eval command: ${evalCommand}`);

+ 6 - 7
backend/src/modules/EventsModule.ts

@@ -151,7 +151,7 @@ export class EventsModule extends BaseModule {
 		if (!channel || typeof channel !== "string")
 			throw new Error("Invalid channel");
 
-		return `${type}:${channel}`;
+		return `${type}.${channel}`;
 	}
 
 	/**
@@ -172,9 +172,7 @@ export class EventsModule extends BaseModule {
 	 * subscriptionListener - Listener for event subscriptions
 	 */
 	private async _subscriptionListener(message: string, key: string) {
-		const [, channel] = key.split(":");
-
-		if (!this._subscriptions || !this._subscriptions[channel]) return;
+		const [, channel] = key.split(".");
 
 		if (message.startsWith("[") || message.startsWith("{"))
 			try {
@@ -185,9 +183,10 @@ export class EventsModule extends BaseModule {
 		else if (message.startsWith('"') && message.endsWith('"'))
 			message = message.substring(1).substring(0, message.length - 2);
 
-		await Promise.all(
-			this._subscriptions[channel].map(async cb => cb(message))
-		);
+		if (this._subscriptions && this._subscriptions[channel])
+			await Promise.all(
+				this._subscriptions[channel].map(async cb => cb(message))
+			);
 
 		if (this._pSubscriptions)
 			await Promise.all(

+ 34 - 20
backend/src/modules/WebSocketModule.ts

@@ -10,6 +10,7 @@ import JobQueue from "@/JobQueue";
 import DataModule from "./DataModule";
 import { UserModel } from "./DataModule/models/users/schema";
 import { SessionModel } from "./DataModule/models/sessions/schema";
+import EventsModule from "./EventsModule";
 
 export class WebSocketModule extends BaseModule {
 	private _httpServer?: Server;
@@ -23,6 +24,8 @@ export class WebSocketModule extends BaseModule {
 	 */
 	public constructor() {
 		super("websocket");
+
+		this._dependentModules = ["data", "events"];
 	}
 
 	/**
@@ -53,6 +56,24 @@ export class WebSocketModule extends BaseModule {
 			clearInterval(this._keepAliveInterval)
 		);
 
+		await EventsModule.pSubscribe("job.*", async payload => {
+			if (
+				!payload ||
+				typeof payload !== "object" ||
+				Array.isArray(payload)
+			)
+				return;
+
+			const { socketId, callbackRef } = payload;
+
+			if (!socketId || !callbackRef) return;
+
+			delete payload.socketId;
+			delete payload.callbackRef;
+
+			this.dispatch(socketId, "jobCallback", callbackRef, payload);
+		});
+
 		await super._started();
 	}
 
@@ -174,18 +195,17 @@ export class WebSocketModule extends BaseModule {
 		);
 
 		socket.on("close", async () => {
-			await JobQueue.runJob(
-				"events",
-				"unsubscribeAll",
-				{},
-				{
-					socketId: socket.getSocketId()
-				}
-			);
+			const socketId = socket.getSocketId();
+
+			const Job = EventsModule.getJob("unsubscribeAll");
+
+			await JobQueue.runJob(Job, null, {
+				socketId
+			});
 
 			socket.log({
 				type: "debug",
-				message: `WebSocket closed #${socket.getSocketId()}`
+				message: `WebSocket closed #${socketId}`
 			});
 		});
 
@@ -243,17 +263,11 @@ export class WebSocketModule extends BaseModule {
 				);
 			}
 
-			await JobQueue.queueJob(
-				moduleName,
-				jobName,
-				payload,
-				{},
-				{
-					session,
-					socketId: socket.getSocketId(),
-					callbackRef
-				}
-			);
+			await JobQueue.queueJob(Job, payload, {
+				session,
+				socketId: socket.getSocketId(),
+				callbackRef
+			});
 		} catch (error) {
 			const message = error?.message ?? error;