|
@@ -11,6 +11,10 @@ import config from "config";
|
|
import { forEachIn } from "@common/utils/forEachIn";
|
|
import { forEachIn } from "@common/utils/forEachIn";
|
|
import BaseModule, { ModuleStatus } from "@/BaseModule";
|
|
import BaseModule, { ModuleStatus } from "@/BaseModule";
|
|
import WebSocketModule from "./WebSocketModule";
|
|
import WebSocketModule from "./WebSocketModule";
|
|
|
|
+import Event from "@/modules/EventsModule/Event";
|
|
|
|
+import { EventDerived } from "@/types/EventDerived";
|
|
|
|
+import assertEventDerived from "@/utils/assertEventDerived";
|
|
|
|
+import ModuleManager from "@/ModuleManager";
|
|
|
|
|
|
export class EventsModule extends BaseModule {
|
|
export class EventsModule extends BaseModule {
|
|
private _pubClient?: RedisClientType<
|
|
private _pubClient?: RedisClientType<
|
|
@@ -29,7 +33,7 @@ export class EventsModule extends BaseModule {
|
|
|
|
|
|
private _pSubscriptions: Record<
|
|
private _pSubscriptions: Record<
|
|
string,
|
|
string,
|
|
- ((message: any) => Promise<void>)[]
|
|
|
|
|
|
+ ((message: string, key: string) => Promise<void>)[]
|
|
>;
|
|
>;
|
|
|
|
|
|
private _socketSubscriptions: Record<string, Set<string>>;
|
|
private _socketSubscriptions: Record<string, Set<string>>;
|
|
@@ -148,7 +152,11 @@ export class EventsModule extends BaseModule {
|
|
|
|
|
|
const { database = 0 } = this._subClient.options ?? {};
|
|
const { database = 0 } = this._subClient.options ?? {};
|
|
|
|
|
|
- await this._subClient.PSUBSCRIBE(
|
|
|
|
|
|
+ await this._subClient.pSubscribe("event.*", async (...args) =>
|
|
|
|
+ this._subscriptionListener(...args)
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ await this._subClient.pSubscribe(
|
|
`__keyevent@${database}__:expired`,
|
|
`__keyevent@${database}__:expired`,
|
|
async message => {
|
|
async message => {
|
|
const type = message.substring(0, message.indexOf("."));
|
|
const type = message.substring(0, message.indexOf("."));
|
|
@@ -166,6 +174,26 @@ export class EventsModule extends BaseModule {
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public getEvent(path: string) {
|
|
|
|
+ const moduleName = path.substring(0, path.indexOf("."));
|
|
|
|
+ const eventName = path.substring(path.indexOf(".") + 1);
|
|
|
|
+
|
|
|
|
+ if (moduleName === this._name) return super.getEvent(eventName);
|
|
|
|
+
|
|
|
|
+ const module = ModuleManager.getModule(moduleName);
|
|
|
|
+ if (!module) throw new Error(`Module "${moduleName}" not found`);
|
|
|
|
+
|
|
|
|
+ return module.getEvent(eventName);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public getAllEvents() {
|
|
|
|
+ return Object.fromEntries(
|
|
|
|
+ Object.entries(ModuleManager.getModules() ?? {}).map(
|
|
|
|
+ ([name, module]) => [name, Object.keys(module.getEvents())]
|
|
|
|
+ )
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* createKey - Create hex key
|
|
* createKey - Create hex key
|
|
*/
|
|
*/
|
|
@@ -182,13 +210,11 @@ export class EventsModule extends BaseModule {
|
|
/**
|
|
/**
|
|
* publish - Publish an event
|
|
* publish - Publish an event
|
|
*/
|
|
*/
|
|
- public async publish(channel: string, value: any) {
|
|
|
|
|
|
+ public async publish(event: typeof Event) {
|
|
if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
|
|
if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
|
|
|
- if (!value) throw new Error("Invalid value");
|
|
|
|
-
|
|
|
|
- if (["object", "array"].includes(typeof value))
|
|
|
|
- value = JSON.stringify(value);
|
|
|
|
|
|
+ const channel = event.getKey();
|
|
|
|
+ const value = event.makeMessage();
|
|
|
|
|
|
await this._pubClient.publish(this._createKey("event", channel), value);
|
|
await this._pubClient.publish(this._createKey("event", channel), value);
|
|
}
|
|
}
|
|
@@ -201,37 +227,29 @@ export class EventsModule extends BaseModule {
|
|
|
|
|
|
if (type !== "event") return;
|
|
if (type !== "event") return;
|
|
|
|
|
|
- const channel = key.substring(key.indexOf(".") + 1);
|
|
|
|
|
|
+ key = key.substring(key.indexOf(".") + 1);
|
|
|
|
|
|
- if (message.startsWith("[") || message.startsWith("{"))
|
|
|
|
- try {
|
|
|
|
- message = JSON.parse(message);
|
|
|
|
- } catch (err) {
|
|
|
|
- console.error(err);
|
|
|
|
- }
|
|
|
|
- else if (message.startsWith('"') && message.endsWith('"'))
|
|
|
|
- message = message.substring(1).substring(0, message.length - 2);
|
|
|
|
|
|
+ const { path, scope } = Event.parseKey(key);
|
|
|
|
+ const EventClass = this.getEvent(path);
|
|
|
|
+ const parsedMessage = EventClass.parseMessage(message);
|
|
|
|
+ const event = new EventClass(parsedMessage, scope);
|
|
|
|
|
|
- if (this._subscriptions && this._subscriptions[channel])
|
|
|
|
- await forEachIn(this._subscriptions[channel], async cb =>
|
|
|
|
- cb(message)
|
|
|
|
- );
|
|
|
|
|
|
+ if (this._subscriptions && this._subscriptions[key])
|
|
|
|
+ await forEachIn(this._subscriptions[key], async cb => cb(event));
|
|
|
|
|
|
if (this._pSubscriptions)
|
|
if (this._pSubscriptions)
|
|
await forEachIn(
|
|
await forEachIn(
|
|
Object.entries(this._pSubscriptions).filter(([subscription]) =>
|
|
Object.entries(this._pSubscriptions).filter(([subscription]) =>
|
|
- new RegExp(subscription).test(channel)
|
|
|
|
|
|
+ new RegExp(subscription).test(key)
|
|
),
|
|
),
|
|
async ([, callbacks]) =>
|
|
async ([, callbacks]) =>
|
|
- forEachIn(callbacks, async cb => cb(message))
|
|
|
|
|
|
+ forEachIn(callbacks, async cb => cb(event))
|
|
);
|
|
);
|
|
|
|
|
|
- if (!this._socketSubscriptions[channel]) return;
|
|
|
|
|
|
+ if (!this._socketSubscriptions[key]) return;
|
|
|
|
|
|
- for await (const socketId of this._socketSubscriptions[
|
|
|
|
- channel
|
|
|
|
- ].values()) {
|
|
|
|
- await WebSocketModule.dispatch(socketId, channel, message);
|
|
|
|
|
|
+ for await (const socketId of this._socketSubscriptions[key].values()) {
|
|
|
|
+ await WebSocketModule.dispatch(socketId, key, event.getData());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -239,31 +257,27 @@ export class EventsModule extends BaseModule {
|
|
* subscribe - Subscribe to an event or schedule completion
|
|
* subscribe - Subscribe to an event or schedule completion
|
|
*/
|
|
*/
|
|
public async subscribe(
|
|
public async subscribe(
|
|
- type: "event" | "schedule",
|
|
|
|
- channel: string,
|
|
|
|
- callback: (message?: any) => Promise<void>
|
|
|
|
|
|
+ EventClass: any,
|
|
|
|
+ callback: (message?: any) => Promise<void>,
|
|
|
|
+ scope?: string
|
|
) {
|
|
) {
|
|
if (!this._subClient) throw new Error("Redis subClient unavailable.");
|
|
if (!this._subClient) throw new Error("Redis subClient unavailable.");
|
|
|
|
|
|
|
|
+ const key = EventClass.getKey(scope);
|
|
|
|
+ const type = EventClass.getType();
|
|
|
|
+
|
|
if (type === "schedule") {
|
|
if (type === "schedule") {
|
|
- if (!this._scheduleCallbacks[channel])
|
|
|
|
- this._scheduleCallbacks[channel] = [];
|
|
|
|
|
|
+ if (!this._scheduleCallbacks[key])
|
|
|
|
+ this._scheduleCallbacks[key] = [];
|
|
|
|
|
|
- this._scheduleCallbacks[channel].push(() => callback());
|
|
|
|
|
|
+ this._scheduleCallbacks[key].push(() => callback());
|
|
|
|
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- if (!this._subscriptions[channel]) {
|
|
|
|
- this._subscriptions[channel] = [];
|
|
|
|
-
|
|
|
|
- await this._subClient.subscribe(
|
|
|
|
- this._createKey(type, channel),
|
|
|
|
- (...args) => this._subscriptionListener(...args)
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
|
|
+ if (!this._subscriptions[key]) this._subscriptions[key] = [];
|
|
|
|
|
|
- this._subscriptions[channel].push(callback);
|
|
|
|
|
|
+ this._subscriptions[key].push(callback);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -271,18 +285,11 @@ export class EventsModule extends BaseModule {
|
|
*/
|
|
*/
|
|
public async pSubscribe(
|
|
public async pSubscribe(
|
|
pattern: string,
|
|
pattern: string,
|
|
- callback: (message?: any) => Promise<void>
|
|
|
|
|
|
+ callback: (message: string, key: string) => Promise<void>
|
|
) {
|
|
) {
|
|
if (!this._subClient) throw new Error("Redis subClient unavailable.");
|
|
if (!this._subClient) throw new Error("Redis subClient unavailable.");
|
|
|
|
|
|
- if (!this._pSubscriptions[pattern]) {
|
|
|
|
- this._pSubscriptions[pattern] = [];
|
|
|
|
-
|
|
|
|
- await this._subClient.pSubscribe(
|
|
|
|
- this._createKey("event", pattern),
|
|
|
|
- (...args) => this._subscriptionListener(...args)
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
|
|
+ if (!this._pSubscriptions[pattern]) this._pSubscriptions[pattern] = [];
|
|
|
|
|
|
this._pSubscriptions[pattern].push(callback);
|
|
this._pSubscriptions[pattern].push(callback);
|
|
}
|
|
}
|
|
@@ -319,10 +326,8 @@ export class EventsModule extends BaseModule {
|
|
|
|
|
|
this._subscriptions[channel].splice(index, 1);
|
|
this._subscriptions[channel].splice(index, 1);
|
|
|
|
|
|
- if (this._subscriptions[channel].length === 0) {
|
|
|
|
|
|
+ if (this._subscriptions[channel].length === 0)
|
|
delete this._subscriptions[channel];
|
|
delete this._subscriptions[channel];
|
|
- await this._subClient.unsubscribe(this._createKey(type, channel)); // TODO: Provide callback when unsubscribing
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -354,8 +359,6 @@ export class EventsModule extends BaseModule {
|
|
|
|
|
|
public async subscribeSocket(channel: string, socketId: string) {
|
|
public async subscribeSocket(channel: string, socketId: string) {
|
|
if (!this._socketSubscriptions[channel]) {
|
|
if (!this._socketSubscriptions[channel]) {
|
|
- await this.subscribe("event", channel, () => Promise.resolve());
|
|
|
|
-
|
|
|
|
this._socketSubscriptions[channel] = new Set();
|
|
this._socketSubscriptions[channel] = new Set();
|
|
}
|
|
}
|
|
|
|
|