123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- import async from "async";
- import config from "config";
- import redis from "redis";
- import mongoose from "mongoose";
- import CoreClass from "../../core";
- const pubs = {};
- const subs = {};
- let CacheModule;
- class _CacheModule extends CoreClass {
-
- constructor() {
- super("cache");
- CacheModule = this;
- }
-
- async initialize() {
- const importSchema = schemaName =>
- new Promise(resolve => {
- import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));
- });
- this.schemas = {
- session: await importSchema("session"),
- station: await importSchema("station"),
- playlist: await importSchema("playlist"),
- officialPlaylist: await importSchema("officialPlaylist"),
- song: await importSchema("song"),
- punishment: await importSchema("punishment"),
- recentActivity: await importSchema("recentActivity"),
- ratings: await importSchema("ratings")
- };
- return new Promise((resolve, reject) => {
- this.log("INFO", "Connecting...");
- this.client = redis.createClient({
- ...config.get("redis"),
- reconnectStrategy: retries => {
- if (this.getStatus() !== "LOCKDOWN") {
- if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");
- this.log("INFO", `Attempting to reconnect.`);
- if (retries >= 10) {
- this.log("ERROR", `Stopped trying to reconnect.`);
- this.setStatus("FAILED");
- new Error("Stopped trying to reconnect.");
- } else {
- Math.min(retries * 50, 500);
- }
- }
- }
- });
- this.client.on("error", err => {
- if (this.getStatus() === "INITIALIZING") reject(err);
- if (this.getStatus() === "LOCKDOWN") return;
- this.log("ERROR", `Error ${err.message}.`);
- });
- this.client.on("ready", () => {
- this.log("INFO", "Redis is ready.");
- if (this.getStatus() === "INITIALIZING") resolve();
- else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");
- });
- this.client.connect().then(async () => {
- this.log("INFO", "Connected succesfully.");
- });
-
- CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {
- async.eachLimit(keys, 1, (key, next) => {
- CacheModule.runJob("DEL", { key }).finally(() => {
- next();
- });
- });
- });
- });
- }
-
- QUIT() {
- return new Promise(resolve => {
- if (CacheModule.client.connected) {
- CacheModule.client.quit();
- Object.keys(pubs).forEach(channel => pubs[channel].quit());
- Object.keys(subs).forEach(channel => subs[channel].client.quit());
- }
- resolve();
- });
- }
-
- SET(payload) {
- return new Promise((resolve, reject) => {
- let { key, value } = payload;
- const { ttl } = payload;
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
-
- if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
- let options = null;
- if (ttl) {
- options = {
- EX: ttl
- };
- }
- CacheModule.client
- .SET(key, value, options)
- .then(() => {
- let parsed = value;
- try {
- parsed = JSON.parse(value);
- } catch {
-
- }
- resolve(parsed);
- })
- .catch(err => reject(new Error(err)));
- });
- }
-
- HSET(payload) {
- return new Promise((resolve, reject) => {
- let { key } = payload;
- let { value } = payload;
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
-
- if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
- CacheModule.client
- .HSET(payload.table, key, value)
- .then(() => resolve(JSON.parse(value)))
- .catch(err => reject(new Error(err)));
- });
- }
-
- GET(payload) {
- return new Promise((resolve, reject) => {
- let { key } = payload;
- if (!key) {
- reject(new Error("Invalid key!"));
- return;
- }
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
- CacheModule.client
- .GET(key, payload.value)
- .then(value => {
- if (value && !value.startsWith("{") && !value.startsWith("[")) return resolve(value);
- let parsedValue;
- try {
- parsedValue = JSON.parse(value);
- } catch (err) {
- return reject(err);
- }
- return resolve(parsedValue);
- })
- .catch(err => reject(new Error(err)));
- });
- }
-
- HGET(payload) {
- return new Promise((resolve, reject) => {
- let { key } = payload;
- if (!key) {
- reject(new Error("Invalid key!"));
- return;
- }
- if (!payload.table) {
- reject(new Error("Invalid table!"));
- return;
- }
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
- CacheModule.client
- .HGET(payload.table, key, payload.value)
- .then(value => {
- let parsedValue;
- try {
- parsedValue = JSON.parse(value);
- } catch (err) {
- return reject(err);
- }
- return resolve(parsedValue);
- })
- .catch(err => reject(new Error(err)));
- });
- }
-
- HDEL(payload) {
- return new Promise((resolve, reject) => {
- let { key } = payload;
- if (!payload.table) {
- reject(new Error("Invalid table!"));
- return;
- }
- if (!key) {
- reject(new Error("Invalid key!"));
- return;
- }
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
- CacheModule.client
- .HDEL(payload.table, key)
- .then(() => resolve())
- .catch(err => reject(new Error(err)));
- });
- }
-
- HGETALL(payload) {
- return new Promise((resolve, reject) => {
- if (!payload.table) {
- reject(new Error("Invalid table!"));
- return;
- }
- CacheModule.client
- .HGETALL(payload.table)
- .then(obj => {
- if (obj)
- Object.keys(obj).forEach(key => {
- obj[key] = JSON.parse(obj[key]);
- });
- else if (!obj) obj = [];
- resolve(obj);
- })
- .catch(err => reject(new Error(err)));
- });
- }
-
- DEL(payload) {
- return new Promise((resolve, reject) => {
- let { key } = payload;
- if (!key) {
- reject(new Error("Invalid key!"));
- return;
- }
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
- CacheModule.client
- .DEL(key)
- .then(() => resolve())
- .catch(err => reject(new Error(err)));
- });
- }
-
- PUB(payload) {
- return new Promise((resolve, reject) => {
- let { value } = payload;
- if (!payload.channel) {
- reject(new Error("Invalid channel!"));
- return;
- }
- if (!value) {
- reject(new Error("Invalid value!"));
- return;
- }
- if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
- CacheModule.client
- .publish(payload.channel, value)
- .then(() => resolve())
- .catch(err => reject(new Error(err)));
- });
- }
-
- SUB(payload) {
- return new Promise((resolve, reject) => {
- if (!payload.channel) {
- reject(new Error("Invalid channel!"));
- return;
- }
- if (subs[payload.channel] === undefined) {
- subs[payload.channel] = {
- client: redis.createClient(config.get("redis")),
- cbs: []
- };
- subs[payload.channel].client.connect().then(() => {
- subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {
- if (message.startsWith("[") || message.startsWith("{"))
- try {
- message = JSON.parse(message);
- } catch (err) {
- console.error(err);
- }
- else if (message.startsWith('"') && message.endsWith('"'))
- message = message.substring(1).substring(0, message.length - 2);
- subs[channel].cbs.forEach(cb => cb(message));
- });
- });
- }
- subs[payload.channel].cbs.push(payload.cb);
- resolve();
- });
- }
-
- LRANGE(payload) {
- return new Promise((resolve, reject) => {
- let { key } = payload;
- if (!key) {
- reject(new Error("Invalid key!"));
- return;
- }
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
- CacheModule.client
- .LRANGE(key, 0, -1)
- .then(list => resolve(list))
- .catch(err => reject(new Error(err)));
- });
- }
-
- RPUSH(payload) {
- return new Promise((resolve, reject) => {
- let { key, value } = payload;
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
-
- if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
- CacheModule.client
- .RPUSH(key, value)
- .then(() => resolve())
- .catch(err => reject(new Error(err)));
- });
- }
-
- LPUSH(payload) {
- return new Promise((resolve, reject) => {
- let { key, value } = payload;
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
-
- if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
- CacheModule.client
- .LPUSH(key, value)
- .then(() => resolve())
- .catch(err => reject(new Error(err)));
- });
- }
-
- LLEN(payload) {
- return new Promise((resolve, reject) => {
- const { key } = payload;
- CacheModule.client
- .LLEN(key)
- .then(len => resolve(len))
- .catch(err => reject(new Error(err)));
- });
- }
-
- RPOP(payload) {
- return new Promise((resolve, reject) => {
- const { key } = payload;
- CacheModule.client
- .RPOP(key)
- .then(() => resolve())
- .catch(err => reject(new Error(err)));
- });
- }
-
- LREM(payload) {
- return new Promise((resolve, reject) => {
- let { key, value } = payload;
- if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
-
- if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
- CacheModule.client
- .LREM(key, 1, value)
- .then(() => resolve())
- .catch(err => reject(new Error(err)));
- });
- }
-
- KEYS(payload) {
- return new Promise((resolve, reject) => {
- const { pattern } = payload;
- CacheModule.client
- .KEYS(pattern)
- .then(keys => resolve(keys))
- .catch(err => reject(new Error(err)));
- });
- }
-
- GET_SCHEMA(payload) {
- return new Promise(resolve => {
- resolve(CacheModule.schemas[payload.schemaName]);
- });
- }
- }
- export default new _CacheModule();
|