Browse Source

refactor: Dispatch websocket job callback from job

Owen Diffey 1 năm trước cách đây
mục cha
commit
bd8a3e6608

+ 41 - 8
backend/src/Job.ts

@@ -4,6 +4,7 @@ import LogBook, { Log } from "@/LogBook";
 import ModuleManager from "@/ModuleManager";
 import { JobOptions } from "@/types/JobOptions";
 import { Modules } from "@/types/Modules";
+import WebSocketModule from "./modules/WebSocketModule";
 
 export enum JobStatus {
 	QUEUED = "QUEUED",
@@ -74,9 +75,11 @@ export default class Job {
 		let contextOptions;
 
 		if (options) {
-			const { priority, longJob, session, socketId } = options;
+			const { priority, longJob, session, socketId, callbackRef } =
+				options;
 
-			if (session || socketId) contextOptions = { session, socketId };
+			if (session || socketId)
+				contextOptions = { session, socketId, callbackRef };
 
 			if (priority) this._priority = priority;
 
@@ -174,22 +177,52 @@ export default class Job {
 				.apply(this._module, [this._context, this._payload])
 				// eslint-disable-next-line
 				// @ts-ignore
-				.then(response => {
+				.then(async data => {
+					if (this._context.getSocketId()) {
+						await WebSocketModule.dispatch(
+							this._context.getSocketId(),
+							"jobCallback",
+							this._context.getCallbackRef(),
+							{
+								status: "success",
+								data
+							}
+						);
+					}
+
 					this.log({
 						message: "Job completed successfully",
 						type: "success"
 					});
+
 					JobStatistics.updateStats(this.getName(), "successful");
-					return response;
+
+					return data;
 				})
-				.catch((err: any) => {
+				.catch(async (error: any) => {
+					const message = error?.message ?? error;
+
+					if (this._context.getSocketId()) {
+						await WebSocketModule.dispatch(
+							this._context.getSocketId(),
+							"jobCallback",
+							this._context.getCallbackRef(),
+							{
+								status: "error",
+								message
+							}
+						);
+					}
+
 					this.log({
-						message: `Job failed with error "${err}"`,
+						message: `Job failed with error "${message}"`,
 						type: "error",
-						data: { error: err }
+						data: { error }
 					});
+
 					JobStatistics.updateStats(this.getName(), "failed");
-					throw err;
+
+					throw error;
 				})
 				.finally(() => {
 					this._completedAt = performance.now();

+ 12 - 1
backend/src/JobContext.ts

@@ -15,13 +15,20 @@ export default class JobContext {
 
 	private readonly _socketId?: string;
 
+	private readonly _callbackRef?: string;
+
 	public constructor(
 		job: Job,
-		options?: { session?: SessionSchema; socketId?: string }
+		options?: {
+			session?: SessionSchema;
+			socketId?: string;
+			callbackRef?: string;
+		}
 	) {
 		this.job = job;
 		this._session = options?.session;
 		this._socketId = options?.socketId;
+		this._callbackRef = options?.callbackRef;
 	}
 
 	/**
@@ -45,6 +52,10 @@ export default class JobContext {
 		return this._socketId;
 	}
 
+	public getCallbackRef() {
+		return this._callbackRef;
+	}
+
 	/**
 	 * executeJob - Execute a job
 	 *

+ 2 - 151
backend/src/modules/APIModule.ts

@@ -1,11 +1,8 @@
-import config from "config";
-import { Types, isObjectIdOrHexString } from "mongoose";
-import { IncomingMessage } from "node:http";
+import { Types } from "mongoose";
 import { UserRole } from "@/models/schemas/users/UserRole";
 import JobContext from "@/JobContext";
 import BaseModule from "@/BaseModule";
-import { Jobs, Modules, UniqueMethods } from "@/types/Modules";
-import WebSocket from "@/WebSocket";
+import { UniqueMethods } from "@/types/Modules";
 import permissions from "@/permissions";
 import Job from "@/Job";
 import { Models } from "@/types/Models";
@@ -27,11 +24,6 @@ export class APIModule extends BaseModule {
 		this._dependentModules = ["cache", "data", "events", "websocket"];
 
 		this._subscriptions = {};
-
-		this._jobConfig = {
-			prepareWebsocket: false,
-			runJob: false
-		};
 	}
 
 	/**
@@ -54,147 +46,6 @@ export class APIModule extends BaseModule {
 		await super._stopped();
 	}
 
-	/**
-	 * runJob - Run a job
-	 */
-	public async runJob<
-		ModuleNameType extends keyof Jobs & keyof Modules,
-		JobNameType extends keyof Jobs[ModuleNameType] &
-			keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
-		PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
-			? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
-				? Record<string, never>
-				: Jobs[ModuleNameType][JobNameType]["payload"]
-			: Record<string, never>,
-		ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
-			? Jobs[ModuleNameType][JobNameType]["returns"]
-			: never
-	>(
-		context: JobContext,
-		{
-			moduleName,
-			jobName,
-			payload,
-			sessionId,
-			socketId
-		}: {
-			moduleName: ModuleNameType;
-			jobName: JobNameType;
-			payload: PayloadType;
-			sessionId?: string;
-			socketId?: string;
-		}
-	): Promise<ReturnType> {
-		let session;
-		if (sessionId) {
-			const Session = await DataModule.getModel("sessions");
-
-			session = await Session.findByIdAndUpdate(sessionId, {
-				updatedAt: Date.now()
-			});
-		}
-
-		return context.executeJob(moduleName, jobName, payload, {
-			session,
-			socketId
-		});
-	}
-
-	/**
-	 * getCookieValueFromHeader - Get value of a cookie from cookie header string
-	 */
-	private _getCookieValueFromHeader(cookieName: string, header: string) {
-		const cookie = header
-			.split("; ")
-			.find(
-				cookie =>
-					cookie.substring(0, cookie.indexOf("=")) === cookieName
-			);
-
-		return cookie?.substring(cookie.indexOf("=") + 1, cookie.length);
-	}
-
-	/**
-	 * prepareWebsocket - Prepare websocket connection
-	 */
-	public async prepareWebsocket(
-		context: JobContext,
-		{ socket, request }: { socket: WebSocket; request: IncomingMessage }
-	) {
-		const socketId = request.headers["sec-websocket-key"];
-		socket.setSocketId(socketId);
-
-		let sessionId = request.headers.cookie
-			? this._getCookieValueFromHeader(
-					config.get<string>("cookie"),
-					request.headers.cookie
-			  )
-			: undefined;
-
-		if (sessionId && isObjectIdOrHexString(sessionId))
-			socket.setSessionId(sessionId);
-		else sessionId = undefined;
-
-		let user;
-		if (sessionId) {
-			const Session = await DataModule.getModel("sessions");
-
-			const session = await Session.findByIdAndUpdate(sessionId, {
-				updatedAt: Date.now()
-			});
-
-			if (session) {
-				context.setSession(session);
-
-				user = await context.getUser().catch(() => undefined);
-			}
-		}
-
-		socket.on("close", async () => {
-			if (socketId)
-				await JobQueue.runJob(
-					"api",
-					"unsubscribeAll",
-					{},
-					{
-						socketId
-					}
-				);
-		});
-
-		return {
-			config: {
-				cookie: config.get("cookie"),
-				sitename: config.get("sitename"),
-				recaptcha: {
-					enabled: config.get("apis.recaptcha.enabled"),
-					key: config.get("apis.recaptcha.key")
-				},
-				githubAuthentication: config.get("apis.github.enabled"),
-				messages: config.get("messages"),
-				christmas: config.get("christmas"),
-				footerLinks: config.get("footerLinks"),
-				shortcutOverrides: config.get("shortcutOverrides"),
-				registrationDisabled: config.get("registrationDisabled"),
-				mailEnabled: config.get("mail.enabled"),
-				discogsEnabled: config.get("apis.discogs.enabled"),
-				experimental: {
-					changable_listen_mode: config.get(
-						"experimental.changable_listen_mode"
-					),
-					media_session: config.get("experimental.media_session"),
-					disable_youtube_search: config.get(
-						"experimental.disable_youtube_search"
-					),
-					station_history: config.get("experimental.station_history"),
-					soundcloud: config.get("experimental.soundcloud"),
-					spotify: config.get("experimental.spotify")
-				}
-			},
-			user
-		};
-	}
-
 	public async getUserPermissions(context: JobContext) {
 		const user = await context.getUser().catch(() => null);
 

+ 100 - 30
backend/src/modules/WebSocketModule.ts

@@ -2,7 +2,7 @@ import config from "config";
 import express from "express";
 import http, { Server, IncomingMessage } from "node:http";
 import { RawData, WebSocketServer } from "ws";
-import { Types } from "mongoose";
+import { Types, isObjectIdOrHexString } from "mongoose";
 import BaseModule from "@/BaseModule";
 import { UniqueMethods } from "@/types/Modules";
 import WebSocket from "@/WebSocket";
@@ -10,6 +10,7 @@ import JobContext from "@/JobContext";
 import Job from "@/Job";
 import ModuleManager from "@/ModuleManager";
 import JobQueue from "@/JobQueue";
+import DataModule from "./DataModule";
 
 export class WebSocketModule extends BaseModule {
 	private _httpServer?: Server;
@@ -24,12 +25,7 @@ export class WebSocketModule extends BaseModule {
 	public constructor() {
 		super("websocket");
 
-		this._jobConfigDefault = false;
-
-		this._jobConfig = {
-			getSocket: "disabled",
-			getSockets: "disabled"
-		};
+		this._jobConfigDefault = "disabled";
 	}
 
 	/**
@@ -99,10 +95,73 @@ export class WebSocketModule extends BaseModule {
 			return;
 		}
 
-		const readyData = await new Job("prepareWebsocket", "api", {
-			socket,
-			request
-		}).execute();
+		socket.setSocketId(request.headers["sec-websocket-key"]);
+
+		let sessionId;
+		let user;
+
+		if (request.headers.cookie) {
+			sessionId = request.headers.cookie
+				.split("; ")
+				.find(
+					cookie =>
+						cookie.substring(0, cookie.indexOf("=")) ===
+						config.get<string>("cookie")
+				);
+
+			sessionId = sessionId?.substring(
+				sessionId.indexOf("=") + 1,
+				sessionId.length
+			);
+		}
+
+		if (sessionId && isObjectIdOrHexString(sessionId)) {
+			socket.setSessionId(sessionId);
+
+			const Session = await DataModule.getModel("sessions");
+
+			const session = await Session.findByIdAndUpdate(sessionId, {
+				updatedAt: Date.now()
+			});
+
+			if (session) {
+				const User = await DataModule.getModel("users");
+
+				user = await User.findById(session.userId);
+			}
+		}
+
+		const readyData = {
+			config: {
+				cookie: config.get("cookie"),
+				sitename: config.get("sitename"),
+				recaptcha: {
+					enabled: config.get("apis.recaptcha.enabled"),
+					key: config.get("apis.recaptcha.key")
+				},
+				githubAuthentication: config.get("apis.github.enabled"),
+				messages: config.get("messages"),
+				christmas: config.get("christmas"),
+				footerLinks: config.get("footerLinks"),
+				shortcutOverrides: config.get("shortcutOverrides"),
+				registrationDisabled: config.get("registrationDisabled"),
+				mailEnabled: config.get("mail.enabled"),
+				discogsEnabled: config.get("apis.discogs.enabled"),
+				experimental: {
+					changable_listen_mode: config.get(
+						"experimental.changable_listen_mode"
+					),
+					media_session: config.get("experimental.media_session"),
+					disable_youtube_search: config.get(
+						"experimental.disable_youtube_search"
+					),
+					station_history: config.get("experimental.station_history"),
+					soundcloud: config.get("experimental.soundcloud"),
+					spotify: config.get("experimental.spotify")
+				}
+			},
+			user
+		};
 
 		socket.log({
 			type: "debug",
@@ -118,6 +177,15 @@ export class WebSocketModule extends BaseModule {
 		);
 
 		socket.on("close", async () => {
+			await JobQueue.runJob(
+				"api",
+				"unsubscribeAll",
+				{},
+				{
+					socketId: socket.getSocketId()
+				}
+			);
+
 			socket.log({
 				type: "debug",
 				message: `WebSocket closed #${socket.getSocketId()}`
@@ -163,18 +231,29 @@ export class WebSocketModule extends BaseModule {
 			const job = module.getJob(jobName);
 			if (!job.api) throw new Error(`Job "${jobName}" not found.`);
 
-			const res = await JobQueue.runJob("api", "runJob", {
+			let session;
+			if (socket.getSessionId()) {
+				const Session = await DataModule.getModel("sessions");
+
+				session = await Session.findByIdAndUpdate(
+					socket.getSessionId(),
+					{
+						updatedAt: Date.now()
+					}
+				);
+			}
+
+			await JobQueue.queueJob(
 				moduleName,
 				jobName,
 				payload,
-				sessionId: socket.getSessionId(),
-				socketId: socket.getSocketId()
-			});
-
-			socket.dispatch("jobCallback", callbackRef, {
-				status: "success",
-				data: res
-			});
+				{},
+				{
+					session,
+					socketId: socket.getSocketId(),
+					callbackRef
+				}
+			);
 		} catch (error) {
 			const message = error?.message ?? error;
 
@@ -218,20 +297,11 @@ export class WebSocketModule extends BaseModule {
 	/**
 	 * dispatch - Dispatch message to socket
 	 */
-	public async dispatch(
-		context: JobContext,
-		{
-			socketId,
-			channel,
-			value
-		}: { socketId: string; channel: string; value?: any }
-	) {
+	public async dispatch(socketId: string, channel: string, ...values) {
 		const socket = await this.getSocket(socketId);
 
 		if (!socket) return;
 
-		const values = Array.isArray(value) ? value : [value];
-
 		socket.dispatch(channel, ...values);
 	}
 

+ 1 - 0
backend/src/types/JobOptions.ts

@@ -5,4 +5,5 @@ export type JobOptions = {
 	longJob?: string;
 	session?: SessionSchema;
 	socketId?: string;
+	callbackRef?: string;
 };