|
@@ -11,6 +11,10 @@ import config from "config";
|
|
|
import { forEachIn } from "@common/utils/forEachIn";
|
|
|
import BaseModule, { ModuleStatus } from "@/BaseModule";
|
|
|
import WebSocketModule from "./WebSocketModule";
|
|
|
+import TestEvent from "@/modules/EventsModule/events/TestEvent";
|
|
|
+import Event from "@/modules/EventsModule/Event";
|
|
|
+import { EventDerived } from "@/types/EventDerived";
|
|
|
+import assertEventDerived from "@/utils/assertEventDerived";
|
|
|
|
|
|
export class EventsModule extends BaseModule {
|
|
|
private _pubClient?: RedisClientType<
|
|
@@ -25,6 +29,8 @@ export class EventsModule extends BaseModule {
|
|
|
RedisScripts
|
|
|
>;
|
|
|
|
|
|
+ private _events: Record<string, typeof Event>;
|
|
|
+
|
|
|
private _subscriptions: Record<string, ((message: any) => Promise<void>)[]>;
|
|
|
|
|
|
private _pSubscriptions: Record<
|
|
@@ -42,6 +48,9 @@ export class EventsModule extends BaseModule {
|
|
|
public constructor() {
|
|
|
super("events");
|
|
|
|
|
|
+ this._events = {
|
|
|
+ test: TestEvent
|
|
|
+ };
|
|
|
this._subscriptions = {};
|
|
|
this._pSubscriptions = {};
|
|
|
this._socketSubscriptions = {};
|
|
@@ -166,6 +175,15 @@ export class EventsModule extends BaseModule {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ public getEvent(channel: string) {
|
|
|
+ const EventClass = this._events[channel];
|
|
|
+
|
|
|
+ if (!EventClass)
|
|
|
+ throw new Error(`Event with channel "${channel}" not found.`);
|
|
|
+
|
|
|
+ return EventClass;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* createKey - Create hex key
|
|
|
*/
|
|
@@ -182,13 +200,11 @@ export class EventsModule extends BaseModule {
|
|
|
/**
|
|
|
* publish - Publish an event
|
|
|
*/
|
|
|
- public async publish(channel: string, value: any) {
|
|
|
+ public async publish(event: typeof Event) {
|
|
|
if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
|
|
|
- if (!value) throw new Error("Invalid value");
|
|
|
-
|
|
|
- if (["object", "array"].includes(typeof value))
|
|
|
- value = JSON.stringify(value);
|
|
|
+ const channel = event.getChannel();
|
|
|
+ const value = (event.constructor as EventDerived).makeMessage();
|
|
|
|
|
|
await this._pubClient.publish(this._createKey("event", channel), value);
|
|
|
}
|
|
@@ -202,19 +218,13 @@ export class EventsModule extends BaseModule {
|
|
|
if (type !== "event") return;
|
|
|
|
|
|
const channel = key.substring(key.indexOf(".") + 1);
|
|
|
-
|
|
|
- if (message.startsWith("[") || message.startsWith("{"))
|
|
|
- try {
|
|
|
- message = JSON.parse(message);
|
|
|
- } catch (err) {
|
|
|
- console.error(err);
|
|
|
- }
|
|
|
- else if (message.startsWith('"') && message.endsWith('"'))
|
|
|
- message = message.substring(1).substring(0, message.length - 2);
|
|
|
+ const EventClass = this.getEvent(channel);
|
|
|
+ const parsedMessage = EventClass.parseMessage(message);
|
|
|
+ const event = new EventClass(parsedMessage);
|
|
|
|
|
|
if (this._subscriptions && this._subscriptions[channel])
|
|
|
await forEachIn(this._subscriptions[channel], async cb =>
|
|
|
- cb(message)
|
|
|
+ cb(event)
|
|
|
);
|
|
|
|
|
|
if (this._pSubscriptions)
|
|
@@ -223,7 +233,7 @@ export class EventsModule extends BaseModule {
|
|
|
new RegExp(subscription).test(channel)
|
|
|
),
|
|
|
async ([, callbacks]) =>
|
|
|
- forEachIn(callbacks, async cb => cb(message))
|
|
|
+ forEachIn(callbacks, async cb => cb(event))
|
|
|
);
|
|
|
|
|
|
if (!this._socketSubscriptions[channel]) return;
|
|
@@ -231,7 +241,7 @@ export class EventsModule extends BaseModule {
|
|
|
for await (const socketId of this._socketSubscriptions[
|
|
|
channel
|
|
|
].values()) {
|
|
|
- await WebSocketModule.dispatch(socketId, channel, message);
|
|
|
+ await WebSocketModule.dispatch(socketId, channel, event.getData());
|
|
|
}
|
|
|
}
|
|
|
|