|
@@ -6,13 +6,13 @@ import { UniqueMethods } from "../types/Modules";
|
|
|
import JobContext from "../JobContext";
|
|
|
|
|
|
export default class EventsModule extends BaseModule {
|
|
|
- private pubClient?: RedisClientType;
|
|
|
+ private _pubClient?: RedisClientType;
|
|
|
|
|
|
- private subClient?: RedisClientType;
|
|
|
+ private _subClient?: RedisClientType;
|
|
|
|
|
|
- private subscriptions: Record<string, ((message: any) => Promise<void>)[]>;
|
|
|
+ private _subscriptions: Record<string, ((message: any) => Promise<void>)[]>;
|
|
|
|
|
|
- private scheduleCallbacks: Record<string, (() => Promise<void>)[]>;
|
|
|
+ private _scheduleCallbacks: Record<string, (() => Promise<void>)[]>;
|
|
|
|
|
|
/**
|
|
|
* Events Module
|
|
@@ -20,8 +20,8 @@ export default class EventsModule extends BaseModule {
|
|
|
public constructor() {
|
|
|
super("events");
|
|
|
|
|
|
- this.subscriptions = {};
|
|
|
- this.scheduleCallbacks = {};
|
|
|
+ this._subscriptions = {};
|
|
|
+ this._scheduleCallbacks = {};
|
|
|
this.jobApiDefault = false;
|
|
|
}
|
|
|
|
|
@@ -31,8 +31,8 @@ export default class EventsModule extends BaseModule {
|
|
|
public override async startup() {
|
|
|
await super.startup();
|
|
|
|
|
|
- await this.createPubClient();
|
|
|
- await this.createSubClient();
|
|
|
+ await this._createPubClient();
|
|
|
+ await this._createSubClient();
|
|
|
|
|
|
await super.started();
|
|
|
}
|
|
@@ -40,12 +40,12 @@ export default class EventsModule extends BaseModule {
|
|
|
/**
|
|
|
* createPubClient - Create redis client for publishing
|
|
|
*/
|
|
|
- private async createPubClient() {
|
|
|
- this.pubClient = createClient({ ...config.get("redis") });
|
|
|
+ private async _createPubClient() {
|
|
|
+ this._pubClient = createClient({ ...config.get("redis") });
|
|
|
|
|
|
- await this.pubClient.connect();
|
|
|
+ await this._pubClient.connect();
|
|
|
|
|
|
- const redisConfigResponse = await this.pubClient.sendCommand([
|
|
|
+ const redisConfigResponse = await this._pubClient.sendCommand([
|
|
|
"CONFIG",
|
|
|
"GET",
|
|
|
"notify-keyspace-events"
|
|
@@ -69,22 +69,22 @@ export default class EventsModule extends BaseModule {
|
|
|
/**
|
|
|
* createSubClient - Create redis client for subscribing
|
|
|
*/
|
|
|
- private async createSubClient() {
|
|
|
- if (!this.pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
+ private async _createSubClient() {
|
|
|
+ if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
|
|
|
- this.subClient = this.pubClient?.duplicate();
|
|
|
+ this._subClient = this._pubClient?.duplicate();
|
|
|
|
|
|
- await this.subClient.connect();
|
|
|
+ await this._subClient.connect();
|
|
|
|
|
|
- const { database = 0 } = this.subClient.options ?? {};
|
|
|
+ const { database = 0 } = this._subClient.options ?? {};
|
|
|
|
|
|
- await this.subClient.PSUBSCRIBE(
|
|
|
+ await this._subClient.PSUBSCRIBE(
|
|
|
`__keyevent@${database}__:expired`,
|
|
|
async message => {
|
|
|
- if (!this.scheduleCallbacks[message]) return;
|
|
|
+ if (!this._scheduleCallbacks[message]) return;
|
|
|
|
|
|
await Promise.all(
|
|
|
- this.scheduleCallbacks[message].map(callback => callback())
|
|
|
+ this._scheduleCallbacks[message].map(callback => callback())
|
|
|
);
|
|
|
}
|
|
|
);
|
|
@@ -93,7 +93,7 @@ export default class EventsModule extends BaseModule {
|
|
|
/**
|
|
|
* createKey - Create hex key
|
|
|
*/
|
|
|
- private createKey(type: "event" | "schedule", channel: string) {
|
|
|
+ private _createKey(type: "event" | "schedule", channel: string) {
|
|
|
if (!["event", "schedule"].includes(type))
|
|
|
throw new Error("Invalid type");
|
|
|
|
|
@@ -113,25 +113,25 @@ export default class EventsModule extends BaseModule {
|
|
|
context: JobContext,
|
|
|
payload: { channel: string; value: any }
|
|
|
) {
|
|
|
- if (!this.pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
+ if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
|
|
|
let { channel, value } = payload;
|
|
|
|
|
|
- channel = this.createKey("event", channel);
|
|
|
+ channel = this._createKey("event", channel);
|
|
|
|
|
|
if (!value) throw new Error("Invalid value");
|
|
|
|
|
|
if (["object", "array"].includes(typeof value))
|
|
|
value = JSON.stringify(value);
|
|
|
|
|
|
- await this.pubClient.publish(channel, value);
|
|
|
+ await this._pubClient.publish(channel, value);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* subscriptionListener - Listener for event subscriptions
|
|
|
*/
|
|
|
- private async subscriptionListener(message: string, channel: string) {
|
|
|
- if (!this.subscriptions || !this.subscriptions[channel]) return;
|
|
|
+ private async _subscriptionListener(message: string, channel: string) {
|
|
|
+ if (!this._subscriptions || !this._subscriptions[channel]) return;
|
|
|
|
|
|
if (message.startsWith("[") || message.startsWith("{"))
|
|
|
try {
|
|
@@ -142,7 +142,7 @@ export default class EventsModule extends BaseModule {
|
|
|
else if (message.startsWith('"') && message.endsWith('"'))
|
|
|
message = message.substring(1).substring(0, message.length - 2);
|
|
|
|
|
|
- await Promise.all(this.subscriptions[channel].map(cb => cb(message)));
|
|
|
+ await Promise.all(this._subscriptions[channel].map(cb => cb(message)));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -157,44 +157,44 @@ export default class EventsModule extends BaseModule {
|
|
|
unique?: boolean;
|
|
|
}
|
|
|
) {
|
|
|
- if (!this.subClient) throw new Error("Redis subClient unavailable.");
|
|
|
+ if (!this._subClient) throw new Error("Redis subClient unavailable.");
|
|
|
|
|
|
const { type = "event", callback, unique = false } = payload;
|
|
|
|
|
|
- const channel = this.createKey(type, payload.channel);
|
|
|
+ const channel = this._createKey(type, payload.channel);
|
|
|
|
|
|
if (type === "schedule") {
|
|
|
if (
|
|
|
unique &&
|
|
|
- this.scheduleCallbacks[channel] &&
|
|
|
- this.scheduleCallbacks[channel].length > 0
|
|
|
+ this._scheduleCallbacks[channel] &&
|
|
|
+ this._scheduleCallbacks[channel].length > 0
|
|
|
)
|
|
|
return;
|
|
|
|
|
|
- if (!this.scheduleCallbacks[channel])
|
|
|
- this.scheduleCallbacks[channel] = [];
|
|
|
+ if (!this._scheduleCallbacks[channel])
|
|
|
+ this._scheduleCallbacks[channel] = [];
|
|
|
|
|
|
- this.scheduleCallbacks[channel].push(() => callback());
|
|
|
+ this._scheduleCallbacks[channel].push(() => callback());
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
if (
|
|
|
unique &&
|
|
|
- this.subscriptions[channel] &&
|
|
|
- this.subscriptions[channel].length > 0
|
|
|
+ this._subscriptions[channel] &&
|
|
|
+ this._subscriptions[channel].length > 0
|
|
|
)
|
|
|
return;
|
|
|
|
|
|
- if (!this.subscriptions[channel]) {
|
|
|
- this.subscriptions[channel] = [];
|
|
|
+ if (!this._subscriptions[channel]) {
|
|
|
+ this._subscriptions[channel] = [];
|
|
|
|
|
|
- await this.subClient.subscribe(channel, (...args) =>
|
|
|
- this.subscriptionListener(...args)
|
|
|
+ await this._subClient.subscribe(channel, (...args) =>
|
|
|
+ this._subscriptionListener(...args)
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- this.subscriptions[channel].push(callback);
|
|
|
+ this._subscriptions[channel].push(callback);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -208,33 +208,33 @@ export default class EventsModule extends BaseModule {
|
|
|
callback: (message?: any) => Promise<void>;
|
|
|
}
|
|
|
) {
|
|
|
- if (!this.subClient) throw new Error("Redis subClient unavailable.");
|
|
|
+ if (!this._subClient) throw new Error("Redis subClient unavailable.");
|
|
|
|
|
|
const { type = "event", callback } = payload;
|
|
|
- const channel = this.createKey(type, payload.channel);
|
|
|
+ const channel = this._createKey(type, payload.channel);
|
|
|
|
|
|
if (type === "schedule") {
|
|
|
- if (!this.scheduleCallbacks[channel]) return;
|
|
|
+ if (!this._scheduleCallbacks[channel]) return;
|
|
|
|
|
|
- const index = this.scheduleCallbacks[channel].indexOf(callback);
|
|
|
+ const index = this._scheduleCallbacks[channel].indexOf(callback);
|
|
|
|
|
|
- if (index >= 0) this.scheduleCallbacks[channel].splice(index, 1);
|
|
|
+ if (index >= 0) this._scheduleCallbacks[channel].splice(index, 1);
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if (!this.subscriptions[channel]) return;
|
|
|
+ if (!this._subscriptions[channel]) return;
|
|
|
|
|
|
- const index = this.subscriptions[channel].indexOf(callback);
|
|
|
+ const index = this._subscriptions[channel].indexOf(callback);
|
|
|
|
|
|
if (index < 0) return;
|
|
|
|
|
|
- this.subscriptions[channel].splice(index, 1);
|
|
|
+ this._subscriptions[channel].splice(index, 1);
|
|
|
|
|
|
- if (this.subscriptions[channel].length === 0) {
|
|
|
- delete this.subscriptions[channel];
|
|
|
- await this.subClient.unsubscribe(channel, (...args) =>
|
|
|
- this.subscriptionListener(...args)
|
|
|
+ if (this._subscriptions[channel].length === 0) {
|
|
|
+ delete this._subscriptions[channel];
|
|
|
+ await this._subClient.unsubscribe(channel, (...args) =>
|
|
|
+ this._subscriptionListener(...args)
|
|
|
);
|
|
|
}
|
|
|
}
|
|
@@ -249,7 +249,7 @@ export default class EventsModule extends BaseModule {
|
|
|
time: number;
|
|
|
}
|
|
|
) {
|
|
|
- if (!this.pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
+ if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
|
|
|
let { time } = payload;
|
|
|
|
|
@@ -259,9 +259,9 @@ export default class EventsModule extends BaseModule {
|
|
|
|
|
|
if (time <= 0) throw new Error("Time must be greater than 0");
|
|
|
|
|
|
- const channel = this.createKey("schedule", payload.channel);
|
|
|
+ const channel = this._createKey("schedule", payload.channel);
|
|
|
|
|
|
- await this.pubClient.set(channel, "", { PX: time, NX: true });
|
|
|
+ await this._pubClient.set(channel, "", { PX: time, NX: true });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -273,11 +273,11 @@ export default class EventsModule extends BaseModule {
|
|
|
channel: string;
|
|
|
}
|
|
|
) {
|
|
|
- if (!this.pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
+ if (!this._pubClient) throw new Error("Redis pubClient unavailable.");
|
|
|
|
|
|
- const channel = this.createKey("schedule", payload.channel);
|
|
|
+ const channel = this._createKey("schedule", payload.channel);
|
|
|
|
|
|
- await this.pubClient.del(channel);
|
|
|
+ await this._pubClient.del(channel);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -286,11 +286,11 @@ export default class EventsModule extends BaseModule {
|
|
|
public override async shutdown() {
|
|
|
await super.shutdown();
|
|
|
|
|
|
- if (this.pubClient) await this.pubClient.quit();
|
|
|
- if (this.subClient) await this.subClient.quit();
|
|
|
+ if (this._pubClient) await this._pubClient.quit();
|
|
|
+ if (this._subClient) await this._subClient.quit();
|
|
|
|
|
|
- this.subscriptions = {};
|
|
|
- this.scheduleCallbacks = {};
|
|
|
+ this._subscriptions = {};
|
|
|
+ this._scheduleCallbacks = {};
|
|
|
|
|
|
await this.stopped();
|
|
|
}
|