DataModule.ts 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import config from "config";
  2. // import { createClient, RedisClientType } from "redis";
  3. import mongoose, {
  4. Connection,
  5. MongooseDefaultQueryMiddleware,
  6. MongooseDistinctQueryMiddleware,
  7. MongooseQueryOrDocumentMiddleware
  8. } from "mongoose";
  9. import JobContext from "../JobContext";
  10. import BaseModule, { ModuleStatus } from "../BaseModule";
  11. import { UniqueMethods } from "../types/Modules";
  12. import { Models, Schemas } from "../types/Models";
  13. import getDataPlugin from "../schemas/plugins/getData";
  14. export default class DataModule extends BaseModule {
  15. private models?: Models;
  16. private mongoConnection?: Connection;
  17. // private redisClient?: RedisClientType;
  18. /**
  19. * Data Module
  20. */
  21. public constructor() {
  22. super("data");
  23. }
  24. /**
  25. * startup - Startup data module
  26. */
  27. public override async startup() {
  28. await super.startup();
  29. const { user, password, host, port, database } = config.get<{
  30. user: string;
  31. password: string;
  32. host: string;
  33. port: number;
  34. database: string;
  35. }>("mongo");
  36. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  37. this.mongoConnection = await mongoose
  38. .createConnection(mongoUrl)
  39. .asPromise();
  40. this.mongoConnection.set("runValidators", true);
  41. this.mongoConnection.set("sanitizeFilter", true);
  42. this.mongoConnection.set("strict", "throw");
  43. this.mongoConnection.set("strictQuery", "throw");
  44. mongoose.SchemaTypes.String.set("trim", true);
  45. this.mongoConnection.plugin(getDataPlugin, {
  46. tags: ["useGetDataPlugin"]
  47. });
  48. await this.loadModels();
  49. await this.syncModelIndexes();
  50. // @ts-ignore
  51. // this.redisClient = createClient({ ...config.get("redis") });
  52. //
  53. // await this.redisClient.connect();
  54. //
  55. // const redisConfigResponse = await this.redisClient.sendCommand([
  56. // "CONFIG",
  57. // "GET",
  58. // "notify-keyspace-events"
  59. // ]);
  60. //
  61. // if (
  62. // !(
  63. // Array.isArray(redisConfigResponse) &&
  64. // redisConfigResponse[1] === "xE"
  65. // )
  66. // )
  67. // throw new Error(
  68. // `notify-keyspace-events is NOT configured correctly! It is set to: ${
  69. // (Array.isArray(redisConfigResponse) &&
  70. // redisConfigResponse[1]) ||
  71. // "unknown"
  72. // }`
  73. // );
  74. await super.started();
  75. }
  76. /**
  77. * shutdown - Shutdown data module
  78. */
  79. public override async shutdown() {
  80. await super.shutdown();
  81. // if (this.redisClient) await this.redisClient.quit();
  82. if (this.mongoConnection) await this.mongoConnection.close();
  83. }
  84. /**
  85. * loadModel - Import and load model schema
  86. *
  87. * @param modelName - Name of the model
  88. * @returns Model
  89. */
  90. private async loadModel<ModelName extends keyof Models>(
  91. modelName: ModelName
  92. ) {
  93. if (!this.mongoConnection) throw new Error("Mongo is not available");
  94. const { schema }: { schema: Schemas[ModelName] } = await import(
  95. `../schemas/${modelName.toString()}`
  96. );
  97. const preMethods: string[] = [
  98. "aggregate",
  99. "count",
  100. "countDocuments",
  101. "deleteOne",
  102. "deleteMany",
  103. "estimatedDocumentCount",
  104. "find",
  105. "findOne",
  106. "findOneAndDelete",
  107. "findOneAndRemove",
  108. "findOneAndReplace",
  109. "findOneAndUpdate",
  110. "init",
  111. "insertMany",
  112. "remove",
  113. "replaceOne",
  114. "save",
  115. "update",
  116. "updateOne",
  117. "updateMany",
  118. "validate"
  119. ];
  120. preMethods.forEach(preMethod => {
  121. // @ts-ignore
  122. schema.pre(preMethods, () => {
  123. console.log(`Pre-${preMethod}!`);
  124. });
  125. });
  126. return this.mongoConnection.model(modelName.toString(), schema);
  127. }
  128. /**
  129. * loadModels - Load and initialize all models
  130. *
  131. * @returns Promise
  132. */
  133. private async loadModels() {
  134. this.models = {
  135. abc: await this.loadModel("abc"),
  136. news: await this.loadModel("news"),
  137. station: await this.loadModel("station")
  138. };
  139. }
  140. /**
  141. * syncModelIndexes - Sync indexes for all models
  142. */
  143. private async syncModelIndexes() {
  144. if (!this.models) throw new Error("Models not loaded");
  145. await Promise.all(
  146. Object.values(this.models).map(model => model.syncIndexes())
  147. );
  148. }
  149. /**
  150. * getModel - Get model
  151. *
  152. * @returns Model
  153. */
  154. public async getModel<ModelName extends keyof Models>(
  155. jobContext: JobContext,
  156. payload: ModelName | { name: ModelName }
  157. ) {
  158. if (!this.models) throw new Error("Models not loaded");
  159. if (this.getStatus() !== ModuleStatus.STARTED)
  160. throw new Error("Module not started");
  161. const name = typeof payload === "object" ? payload.name : payload;
  162. return this.models[name];
  163. }
  164. }
  165. export type DataModuleJobs = {
  166. [Property in keyof UniqueMethods<DataModule>]: {
  167. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  168. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  169. };
  170. };