瀏覽代碼

refactor: Call EventsModule methods directly

Owen Diffey 1 年之前
父節點
當前提交
2d4c7ef55f
共有 4 個文件被更改,包括 32 次插入75 次删除
  1. 8 19
      backend/src/main.ts
  2. 7 10
      backend/src/modules/APIModule.ts
  3. 2 4
      backend/src/modules/DataModule.ts
  4. 15 42
      backend/src/modules/EventsModule.ts

+ 8 - 19
backend/src/main.ts

@@ -5,6 +5,7 @@ import LogBook from "@/LogBook";
 import JobQueue from "@/JobQueue";
 import JobStatistics from "@/JobStatistics";
 import DataModule from "@/modules/DataModule";
+import EventsModule from "./modules/EventsModule";
 
 process.removeAllListeners("uncaughtException");
 process.on("uncaughtException", err => {
@@ -56,29 +57,17 @@ ModuleManager.startup().then(async () => {
 
 	// Events schedule (was notifications)
 	const now = Date.now();
-	await JobQueue.runJob("events", "schedule", {
-		channel: "test",
-		time: 30000
-	});
-	await JobQueue.runJob("events", "subscribe", {
-		channel: "test",
-		type: "schedule",
-		callback: async () => {
-			console.log(`SCHEDULED: ${now} :: ${Date.now()}`);
-		}
+	EventsModule.schedule("test", 30000);
+
+	await EventsModule.subscribe("schedule", "test", async () => {
+		console.log(`SCHEDULED: ${now} :: ${Date.now()}`);
 	});
 
 	// Events (was cache pub/sub)
-	await JobQueue.runJob("events", "subscribe", {
-		channel: "test",
-		callback: async value => {
-			console.log(`PUBLISHED: ${value}`);
-		}
-	});
-	await JobQueue.runJob("events", "publish", {
-		channel: "test",
-		value: "a value!"
+	await EventsModule.subscribe("event", "test", async value => {
+		console.log(`PUBLISHED: ${value}`);
 	});
+	await EventsModule.publish("test", "a value!");
 });
 
 // TOOD remove, or put behind debug option

+ 7 - 10
backend/src/modules/APIModule.ts

@@ -12,6 +12,7 @@ import { Models } from "@/types/Models";
 import ModuleManager from "@/ModuleManager";
 import JobQueue from "@/JobQueue";
 import DataModule from "@/modules/DataModule";
+import EventsModule from "./EventsModule";
 
 export class APIModule extends BaseModule {
 	private _subscriptions: Record<string, Set<string>>;
@@ -324,11 +325,9 @@ export class APIModule extends BaseModule {
 		this._subscriptions[channel].add(socketId);
 
 		if (this._subscriptions[channel].size === 1)
-			await context.executeJob("events", "subscribe", {
-				type: "event",
-				channel,
-				callback: value => this._subscriptionCallback(channel, value)
-			});
+			await EventsModule.subscribe("event", channel, value =>
+				this._subscriptionCallback(channel, value)
+			);
 	}
 
 	public async subscribeMany(
@@ -367,11 +366,9 @@ export class APIModule extends BaseModule {
 		this._subscriptions[channel].delete(socketId);
 
 		if (this._subscriptions[channel].size === 0) {
-			await context.executeJob("events", "unsubscribe", {
-				type: "event",
-				channel,
-				callback: value => this._subscriptionCallback(channel, value)
-			});
+			await EventsModule.unsubscribe("event", channel, value =>
+				this._subscriptionCallback(channel, value)
+			);
 
 			delete this._subscriptions[channel];
 		}

+ 2 - 4
backend/src/modules/DataModule.ts

@@ -21,6 +21,7 @@ import { UniqueMethods } from "@/types/Modules";
 import { AnyModel, Models } from "@/types/Models";
 import { Schemas } from "@/types/Schemas";
 import JobQueue from "@/JobQueue";
+import EventsModule from "./EventsModule";
 
 /**
  * Experimental: function to get all nested keys from a MongoDB query object
@@ -334,10 +335,7 @@ export class DataModule extends BaseModule {
 
 					if (action !== "created") channel += `.${modelId}`;
 
-					await JobQueue.runJob("events", "publish", {
-						channel,
-						value: { doc, oldDoc }
-					});
+					await EventsModule.publish(channel, { doc, oldDoc });
 				});
 			});
 	}

+ 15 - 42
backend/src/modules/EventsModule.ts

@@ -22,7 +22,7 @@ export class EventsModule extends BaseModule {
 
 		this._subscriptions = {};
 		this._scheduleCallbacks = {};
-		this._jobApiDefault = false;
+		this._jobConfigDefault = "disabled";
 	}
 
 	/**
@@ -154,14 +154,9 @@ export class EventsModule extends BaseModule {
 	/**
 	 * publish - Publish an event
 	 */
-	public async publish(
-		context: JobContext,
-		payload: { channel: string; value: any }
-	) {
+	public async publish(channel: string, value: any) {
 		if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
 
-		let { channel, value } = payload;
-
 		channel = this._createKey("event", channel);
 
 		if (!value) throw new Error("Invalid value");
@@ -196,19 +191,14 @@ export class EventsModule extends BaseModule {
 	 * subscribe - Subscribe to an event or schedule completion
 	 */
 	public async subscribe(
-		context: JobContext,
-		payload: {
-			type?: "event" | "schedule";
-			channel: string;
-			callback: (message?: any) => Promise<void>;
-			unique?: boolean;
-		}
+		type: "event" | "schedule",
+		channel: string,
+		callback: (message?: any) => Promise<void>,
+		unique = false
 	) {
 		if (!this._subClient) throw new Error("Redis subClient unavailable.");
 
-		const { type = "event", callback, unique = false } = payload;
-
-		const channel = this._createKey(type, payload.channel);
+		channel = this._createKey(type, channel);
 
 		if (type === "schedule") {
 			if (
@@ -248,17 +238,13 @@ export class EventsModule extends BaseModule {
 	 * unsubscribe - Unsubscribe from an event or schedule completion
 	 */
 	public async unsubscribe(
-		context: JobContext,
-		payload: {
-			type?: "event" | "schedule";
-			channel: string;
-			callback: (message?: any) => Promise<void>;
-		}
+		type: "event" | "schedule",
+		channel: string,
+		callback: (message?: any) => Promise<void>
 	) {
 		if (!this._subClient) throw new Error("Redis subClient unavailable.");
 
-		const { type = "event", callback } = payload;
-		const channel = this._createKey(type, payload.channel);
+		channel = this._createKey(type, channel);
 
 		if (type === "schedule") {
 			if (!this._scheduleCallbacks[channel]) return;
@@ -291,24 +277,16 @@ export class EventsModule extends BaseModule {
 	/**
 	 * schedule - Schedule a callback trigger
 	 */
-	public async schedule(
-		context: JobContext,
-		payload: {
-			channel: string;
-			time: number;
-		}
-	) {
+	public async schedule(channel: string, time: number) {
 		if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
 
-		let { time } = payload;
-
 		if (typeof time !== "number") throw new Error("Time must be a number");
 
 		time = Math.round(time);
 
 		if (time <= 0) throw new Error("Time must be greater than 0");
 
-		const channel = this._createKey("schedule", payload.channel);
+		channel = this._createKey("schedule", channel);
 
 		await this._pubClient.set(channel, "", { PX: time, NX: true });
 	}
@@ -316,15 +294,10 @@ export class EventsModule extends BaseModule {
 	/**
 	 * unschedule - Unschedule a callback trigger
 	 */
-	public async unschedule(
-		context: JobContext,
-		payload: {
-			channel: string;
-		}
-	) {
+	public async unschedule(channel: string) {
 		if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
 
-		const channel = this._createKey("schedule", payload.channel);
+		channel = this._createKey("schedule", channel);
 
 		await this._pubClient.del(channel);
 	}