Browse Source

feat: Add pSubscribe to events module

Owen Diffey 1 năm trước cách đây
mục cha
commit
e3bf23f78d
1 tập tin đã thay đổi với 38 bổ sung0 xóa
  1. 38 0
      backend/src/modules/EventsModule.ts

+ 38 - 0
backend/src/modules/EventsModule.ts

@@ -10,6 +10,11 @@ export class EventsModule extends BaseModule {
 
 	private _subscriptions: Record<string, ((message: any) => Promise<void>)[]>;
 
+	private _pSubscriptions: Record<
+		string,
+		((message: any) => Promise<void>)[]
+	>;
+
 	private _socketSubscriptions: Record<string, Set<string>>;
 
 	private _scheduleCallbacks: Record<string, (() => Promise<void>)[]>;
@@ -21,6 +26,7 @@ export class EventsModule extends BaseModule {
 		super("events");
 
 		this._subscriptions = {};
+		this._pSubscriptions = {};
 		this._socketSubscriptions = {};
 		this._scheduleCallbacks = {};
 	}
@@ -183,6 +189,17 @@ export class EventsModule extends BaseModule {
 			this._subscriptions[channel].map(async cb => cb(message))
 		);
 
+		if (this._pSubscriptions)
+			await Promise.all(
+				Object.entries(this._pSubscriptions)
+					.filter(([subscription]) =>
+						new RegExp(channel).test(subscription)
+					)
+					.map(async ([, callbacks]) =>
+						Promise.all(callbacks.map(async cb => cb(message)))
+					)
+			);
+
 		if (!this._socketSubscriptions[channel]) return;
 
 		for await (const socketId of this._socketSubscriptions[
@@ -223,6 +240,27 @@ export class EventsModule extends BaseModule {
 		this._subscriptions[channel].push(callback);
 	}
 
+	/**
+	 * pSubscribe - Subscribe to an event with pattern
+	 */
+	public async pSubscribe(
+		pattern: string,
+		callback: (message?: any) => Promise<void>
+	) {
+		if (!this._subClient) throw new Error("Redis subClient unavailable.");
+
+		if (!this._pSubscriptions[pattern]) {
+			this._pSubscriptions[pattern] = [];
+
+			await this._subClient.pSubscribe(
+				this._createKey("event", pattern),
+				(...args) => this._subscriptionListener(...args)
+			);
+		}
+
+		this._pSubscriptions[pattern].push(callback);
+	}
+
 	/**
 	 * unsubscribe - Unsubscribe from an event or schedule completion
 	 */