Prechádzať zdrojové kódy

feat: Integrate events module with api module

Owen Diffey 1 rok pred
rodič
commit
e268545180

+ 121 - 0
backend/src/modules/APIModule.ts

@@ -10,6 +10,8 @@ import { StationType } from "../schemas/station";
 import permissions from "../permissions";
 
 export default class APIModule extends BaseModule {
+	private subscriptions: Record<string, Set<string>>;
+
 	/**
 	 * API Module
 	 */
@@ -17,6 +19,7 @@ export default class APIModule extends BaseModule {
 		super("api");
 
 		this.dependentModules = ["data", "events", "websocket"];
+		this.subscriptions = {};
 	}
 
 	/**
@@ -34,6 +37,8 @@ export default class APIModule extends BaseModule {
 	public override async shutdown() {
 		await super.shutdown();
 
+		await this.removeAllSubscriptions();
+
 		await super.stopped();
 	}
 
@@ -133,6 +138,13 @@ export default class APIModule extends BaseModule {
 			}
 		}
 
+		socket.on("close", async () => {
+			if (socketId)
+				await this.jobQueue.runJob("api", "unsubscribeAll", {
+					socketId
+				});
+		});
+
 		return {
 			config: {
 				cookie: config.get("cookie"),
@@ -205,6 +217,115 @@ export default class APIModule extends BaseModule {
 
 		return rolePermissions;
 	}
+
+	private async subscriptionCallback(channel: string, value?: any) {
+		const promises = [];
+		for await (const socketId of this.subscriptions[channel].values()) {
+			promises.push(
+				this.jobQueue.runJob("websocket", "dispatch", {
+					socketId,
+					channel,
+					value
+				})
+			);
+		}
+		await Promise.all(promises);
+	}
+
+	public async subscribe(
+		context: JobContext,
+		payload: { channel: string; socketId?: string }
+	) {
+		// TODO: assert perm to join by socketId
+		// TODO: Prevent socketId payload from outside backend
+
+		const { channel } = payload;
+
+		const socketId = payload.socketId ?? context.getSocketId();
+
+		if (!socketId) throw new Error("No socketId specified");
+
+		if (!this.subscriptions[channel])
+			this.subscriptions[channel] = new Set();
+
+		if (this.subscriptions[channel].has(socketId)) return;
+
+		this.subscriptions[channel].add(socketId);
+
+		if (this.subscriptions[channel].size === 1)
+			await context.executeJob("events", "subscribe", {
+				type: "event",
+				channel,
+				callback: value => this.subscriptionCallback(channel, value)
+			});
+	}
+
+	public async unsubscribe(
+		context: JobContext,
+		payload: { channel: string; socketId: string }
+	) {
+		const { channel } = payload;
+
+		const socketId = payload.socketId ?? context.getSocketId();
+
+		if (!socketId) throw new Error("No socketId specified");
+
+		if (
+			!(
+				this.subscriptions[channel] &&
+				this.subscriptions[channel].has(socketId)
+			)
+		)
+			return;
+
+		this.subscriptions[channel].delete(socketId);
+
+		if (this.subscriptions[channel].size === 0)
+			await context.executeJob("events", "unsubscribe", {
+				type: "event",
+				channel,
+				callback: value => this.subscriptionCallback(channel, value)
+			});
+	}
+
+	public async unsubscribeAll(
+		context: JobContext,
+		payload: { socketId: string }
+	) {
+		const socketId = payload.socketId ?? context.getSocketId();
+
+		if (!socketId) throw new Error("No socketId specified");
+
+		await Promise.all(
+			Object.entries(this.subscriptions)
+				.filter(([, socketIds]) => socketIds.has(socketId))
+				.map(([channel]) =>
+					context.executeJob("api", "unsubscribe", {
+						socketId,
+						channel
+					})
+				)
+		);
+	}
+
+	private async removeAllSubscriptions() {
+		await Promise.all(
+			Object.entries(this.subscriptions).map(
+				async ([channel, socketIds]) => {
+					const promises = [];
+					for await (const socketId of socketIds.values()) {
+						promises.push(
+							this.jobQueue.runJob("api", "unsubscribe", {
+								socketId,
+								channel
+							})
+						);
+					}
+					return Promise.all(promises);
+				}
+			)
+		);
+	}
 }
 
 export type APIModuleJobs = {

+ 20 - 0
backend/src/modules/WebSocketModule.ts

@@ -191,6 +191,26 @@ export default class WebSocketModule extends BaseModule {
 		return null;
 	}
 
+	/**
+	 * dispatch - Dispatch message to socket
+	 */
+	public async dispatch(
+		context: JobContext,
+		{
+			socketId,
+			channel,
+			value
+		}: { socketId: string; channel: string; value?: any }
+	) {
+		const socket = await context.executeJob("websocket", "getSocket", {
+			socketId
+		});
+
+		if (!socket) return;
+
+		socket.dispatch(channel, value);
+	}
+
 	/**
 	 * shutdown - Shutdown websocket module
 	 */