DataModule.ts 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. import config from "config";
  2. import mongoose, { Connection, Model, Schema, SchemaTypes } from "mongoose";
  3. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  4. import { readdir } from "fs/promises";
  5. import path from "path";
  6. import updateVersioningPlugin from "mongoose-update-versioning";
  7. import Migration from "@/modules/DataModule/Migration";
  8. import documentVersionPlugin from "@/modules/DataModule/plugins/documentVersion";
  9. import getDataPlugin from "@/modules/DataModule/plugins/getData";
  10. import BaseModule, { ModuleStatus } from "@/BaseModule";
  11. import EventsModule from "./EventsModule";
  12. import DataModuleJob from "./DataModule/DataModuleJob";
  13. import Job from "@/Job";
  14. import { forEachIn } from "@/utils/forEachIn";
  15. export class DataModule extends BaseModule {
  16. private _models?: Record<string, Model<any>>;
  17. private _mongoConnection?: Connection;
  18. declare _jobs: Record<string, typeof Job | typeof DataModuleJob>;
  19. /**
  20. * Data Module
  21. */
  22. public constructor() {
  23. super("data");
  24. this._dependentModules = ["events"];
  25. }
  26. /**
  27. * startup - Startup data module
  28. */
  29. public override async startup() {
  30. await super.startup();
  31. await this._createMongoConnection();
  32. await this._runMigrations();
  33. await this._loadModels();
  34. await this._syncModelIndexes();
  35. await this._loadModelJobs();
  36. await super._started();
  37. }
  38. /**
  39. * shutdown - Shutdown data module
  40. */
  41. public override async shutdown() {
  42. await super.shutdown();
  43. patchEventEmitter.removeAllListeners();
  44. if (this._mongoConnection) await this._mongoConnection.close();
  45. await this._stopped();
  46. }
  47. /**
  48. * createMongoConnection - Create mongo connection
  49. */
  50. private async _createMongoConnection() {
  51. mongoose.set({
  52. runValidators: true,
  53. sanitizeFilter: true,
  54. strict: "throw",
  55. strictQuery: "throw"
  56. });
  57. const { user, password, host, port, database } = config.get<{
  58. user: string;
  59. password: string;
  60. host: string;
  61. port: number;
  62. database: string;
  63. }>("mongo");
  64. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  65. this._mongoConnection = await mongoose
  66. .createConnection(mongoUrl)
  67. .asPromise();
  68. }
  69. /**
  70. * registerEvents - Register events for schema with event module
  71. */
  72. private async _registerEvents(modelName: string, schema: Schema<any>) {
  73. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  74. schema.get("patchHistory") ?? {};
  75. if (!enabled) return;
  76. Object.entries({
  77. created: eventCreated,
  78. updated: eventUpdated,
  79. deleted: eventDeleted
  80. })
  81. .filter(([, event]) => !!event)
  82. .forEach(([action, event]) => {
  83. patchEventEmitter.on(event, async ({ doc, oldDoc }) => {
  84. const modelId = doc?._id ?? oldDoc?._id;
  85. const Model = await this.getModel(modelName);
  86. if (doc) doc = Model.hydrate(doc);
  87. if (oldDoc) oldDoc = Model.hydrate(oldDoc);
  88. if (!modelId && action !== "created")
  89. throw new Error(`Model Id not found for "${event}"`);
  90. const channel = `model.${modelName}.${action}`;
  91. await EventsModule.publish(channel, { doc, oldDoc });
  92. if (action !== "created")
  93. await EventsModule.publish(`${channel}.${modelId}`, {
  94. doc,
  95. oldDoc
  96. });
  97. });
  98. });
  99. }
  100. /**
  101. * registerEvents - Register events for schema with event module
  102. */
  103. private async _registerEventListeners(schema: Schema<any>) {
  104. const eventListeners = schema.get("eventListeners");
  105. if (
  106. typeof eventListeners !== "object" ||
  107. Object.keys(eventListeners).length === 0
  108. )
  109. return;
  110. await forEachIn(
  111. Object.entries(eventListeners),
  112. async ([event, callback]) =>
  113. EventsModule.subscribe("event", event, callback)
  114. );
  115. }
  116. /**
  117. * loadModel - Import and load model schema
  118. *
  119. * @param modelName - Name of the model
  120. * @returns Model
  121. */
  122. private async _loadModel(modelName: string): Promise<Model<any>> {
  123. if (!this._mongoConnection) throw new Error("Mongo is not available");
  124. const { schema }: { schema: Schema<any> } = await import(
  125. `./DataModule/models/${modelName.toString()}/schema`
  126. );
  127. schema.plugin(documentVersionPlugin);
  128. schema.set("timestamps", schema.get("timestamps") ?? true);
  129. const patchHistoryConfig = {
  130. enabled: true,
  131. patchHistoryDisabled: true,
  132. eventCreated: `${modelName}.created`,
  133. eventUpdated: `${modelName}.updated`,
  134. eventDeleted: `${modelName}.deleted`,
  135. ...(schema.get("patchHistory") ?? {})
  136. };
  137. schema.set("patchHistory", patchHistoryConfig);
  138. if (patchHistoryConfig.enabled) {
  139. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  140. }
  141. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  142. if (getDataEnabled) schema.plugin(getDataPlugin);
  143. await this._registerEvents(modelName, schema);
  144. await this._registerEventListeners(schema);
  145. schema.set("toObject", { getters: true, virtuals: true });
  146. schema.set("toJSON", { getters: true, virtuals: true });
  147. schema.virtual("_name").get(() => modelName);
  148. schema.plugin(updateVersioningPlugin);
  149. await forEachIn(
  150. Object.entries(schema.paths).filter(
  151. ([, type]) =>
  152. type instanceof SchemaTypes.ObjectId ||
  153. (type instanceof SchemaTypes.Array &&
  154. type.caster instanceof SchemaTypes.ObjectId)
  155. ),
  156. async ([key, type]) => {
  157. const { ref } =
  158. (type instanceof SchemaTypes.ObjectId
  159. ? type?.options
  160. : type.caster?.options) ?? {};
  161. if (ref)
  162. schema.path(key).get(value => {
  163. if (
  164. typeof value === "object" &&
  165. type instanceof SchemaTypes.ObjectId
  166. )
  167. return {
  168. _id: value,
  169. _name: ref
  170. };
  171. if (
  172. Array.isArray(value) &&
  173. type instanceof SchemaTypes.Array
  174. )
  175. return value.map(item =>
  176. item === null
  177. ? null
  178. : {
  179. _id: item,
  180. _name: ref
  181. }
  182. );
  183. return value;
  184. });
  185. }
  186. );
  187. return this._mongoConnection.model(modelName.toString(), schema);
  188. }
  189. /**
  190. * loadModels - Load and initialize all models
  191. *
  192. * @returns Promise
  193. */
  194. private async _loadModels() {
  195. mongoose.SchemaTypes.String.set("trim", true);
  196. this._models = {
  197. abc: await this._loadModel("abc"),
  198. news: await this._loadModel("news"),
  199. sessions: await this._loadModel("sessions"),
  200. stations: await this._loadModel("stations"),
  201. users: await this._loadModel("users")
  202. };
  203. }
  204. /**
  205. * syncModelIndexes - Sync indexes for all models
  206. */
  207. private async _syncModelIndexes() {
  208. if (!this._models) throw new Error("Models not loaded");
  209. await forEachIn(Object.values(this._models), model =>
  210. model.syncIndexes()
  211. );
  212. }
  213. /**
  214. * getModel - Get model
  215. *
  216. * @returns Model
  217. */
  218. public async getModel<ModelType extends Model<any>>(
  219. name: string
  220. ): Promise<ModelType> {
  221. if (!this._models) throw new Error("Models not loaded");
  222. if (this.getStatus() !== ModuleStatus.STARTED)
  223. throw new Error("Module not started");
  224. if (!this._models[name]) throw new Error("Model not found");
  225. return this._models[name] as ModelType;
  226. }
  227. private async _loadModelMigrations(modelName: string) {
  228. if (!this._mongoConnection) throw new Error("Mongo is not available");
  229. let migrations;
  230. try {
  231. migrations = await readdir(
  232. path.resolve(
  233. __dirname,
  234. `./DataModule/models/${modelName}/migrations/`
  235. )
  236. );
  237. } catch (error) {
  238. if (
  239. error instanceof Error &&
  240. "code" in error &&
  241. error.code === "ENOENT"
  242. )
  243. return [];
  244. throw error;
  245. }
  246. return forEachIn(migrations, async migrationFile => {
  247. const { default: Migrate }: { default: typeof Migration } =
  248. await import(
  249. `./DataModule/models/${modelName}/migrations/${migrationFile}`
  250. );
  251. return new Migrate(this._mongoConnection as Connection);
  252. });
  253. }
  254. private async _loadMigrations() {
  255. const models = await readdir(
  256. path.resolve(__dirname, "./DataModule/models/")
  257. );
  258. return forEachIn(models, async modelName =>
  259. this._loadModelMigrations(modelName)
  260. );
  261. }
  262. private async _runMigrations() {
  263. const migrations = (await this._loadMigrations()).flat();
  264. for (let i = 0; i < migrations.length; i += 1) {
  265. const migration = migrations[i];
  266. // eslint-disable-next-line no-await-in-loop
  267. await migration.up();
  268. }
  269. }
  270. private async _loadModelJobs() {
  271. if (!this._models) throw new Error("Models not loaded");
  272. await forEachIn(Object.keys(this._models), async modelName => {
  273. let jobs;
  274. try {
  275. jobs = await readdir(
  276. path.resolve(
  277. __dirname,
  278. `./${this.constructor.name}/models/${modelName}/jobs/`
  279. )
  280. );
  281. } catch (error) {
  282. if (
  283. error instanceof Error &&
  284. "code" in error &&
  285. error.code === "ENOENT"
  286. )
  287. return;
  288. throw error;
  289. }
  290. await forEachIn(jobs, async jobFile => {
  291. const { default: Job } = await import(
  292. `./${this.constructor.name}/models/${modelName}/jobs/${jobFile}`
  293. );
  294. this._jobs[Job.getName()] = Job;
  295. });
  296. });
  297. }
  298. }
  299. export default new DataModule();