DataModule.ts 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. import config from "config";
  2. import mongoose, { Connection, Model, Schema, SchemaTypes } from "mongoose";
  3. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  4. import { readdir } from "fs/promises";
  5. import path from "path";
  6. import updateVersioningPlugin from "mongoose-update-versioning";
  7. import { forEachIn } from "@common/utils/forEachIn";
  8. import Migration from "@/modules/DataModule/Migration";
  9. import documentVersionPlugin from "@/modules/DataModule/plugins/documentVersion";
  10. import getDataPlugin from "@/modules/DataModule/plugins/getData";
  11. import BaseModule, { ModuleStatus } from "@/BaseModule";
  12. import EventsModule from "./EventsModule";
  13. import DataModuleJob from "./DataModule/DataModuleJob";
  14. import Job from "@/Job";
  15. export class DataModule extends BaseModule {
  16. private _models?: Record<string, Model<any>>;
  17. private _mongoConnection?: Connection;
  18. declare _jobs: Record<string, typeof Job | typeof DataModuleJob>;
  19. /**
  20. * Data Module
  21. */
  22. public constructor() {
  23. super("data");
  24. this._dependentModules = ["events"];
  25. }
  26. /**
  27. * startup - Startup data module
  28. */
  29. public override async startup() {
  30. await super.startup();
  31. await this._createMongoConnection();
  32. await this._runMigrations();
  33. await this._loadModels();
  34. await this._syncModelIndexes();
  35. await this._loadModelJobs();
  36. await super._started();
  37. }
  38. /**
  39. * shutdown - Shutdown data module
  40. */
  41. public override async shutdown() {
  42. await super.shutdown();
  43. patchEventEmitter.removeAllListeners();
  44. if (this._mongoConnection) await this._mongoConnection.close();
  45. await this._stopped();
  46. }
  47. /**
  48. * createMongoConnection - Create mongo connection
  49. */
  50. private async _createMongoConnection() {
  51. mongoose.set({
  52. runValidators: true,
  53. sanitizeFilter: true,
  54. strict: "throw",
  55. strictQuery: "throw"
  56. });
  57. const { user, password, host, port, database } = config.get<{
  58. user: string;
  59. password: string;
  60. host: string;
  61. port: number;
  62. database: string;
  63. }>("mongo");
  64. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  65. this._mongoConnection = await mongoose
  66. .createConnection(mongoUrl)
  67. .asPromise();
  68. }
  69. /**
  70. * registerEvents - Register events for schema with event module
  71. */
  72. private async _registerEvents(modelName: string, schema: Schema<any>) {
  73. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  74. schema.get("patchHistory") ?? {};
  75. if (!enabled) return;
  76. Object.entries({
  77. created: eventCreated,
  78. updated: eventUpdated,
  79. deleted: eventDeleted
  80. })
  81. .filter(([, event]) => !!event)
  82. .forEach(([action, event]) => {
  83. patchEventEmitter.on(event!, async ({ doc, oldDoc }) => {
  84. const modelId = doc?._id ?? oldDoc?._id;
  85. const Model = await this.getModel(modelName);
  86. if (doc) doc = Model.hydrate(doc);
  87. if (oldDoc) oldDoc = Model.hydrate(oldDoc);
  88. if (!modelId && action !== "created")
  89. throw new Error(`Model Id not found for "${event}"`);
  90. const channel = `model.${modelName}.${action}`;
  91. await EventsModule.publish(channel, { doc, oldDoc });
  92. if (action !== "created")
  93. await EventsModule.publish(`${channel}.${modelId}`, {
  94. doc,
  95. oldDoc
  96. });
  97. });
  98. });
  99. }
  100. /**
  101. * registerEvents - Register events for schema with event module
  102. */
  103. private async _registerEventListeners(schema: Schema<any>) {
  104. const eventListeners = schema.get("eventListeners");
  105. if (
  106. typeof eventListeners !== "object" ||
  107. Object.keys(eventListeners).length === 0
  108. )
  109. return;
  110. await forEachIn(
  111. Object.entries(eventListeners),
  112. async ([event, callback]) =>
  113. EventsModule.subscribe("event", event, callback)
  114. );
  115. }
  116. /**
  117. * loadModel - Import and load model schema
  118. *
  119. * @param modelName - Name of the model
  120. * @returns Model
  121. */
  122. private async _loadModel(modelName: string): Promise<Model<any>> {
  123. if (!this._mongoConnection) throw new Error("Mongo is not available");
  124. const { schema }: { schema: Schema<any> } = await import(
  125. `./DataModule/models/${modelName.toString()}/schema`
  126. );
  127. schema.plugin(documentVersionPlugin);
  128. schema.set("timestamps", schema.get("timestamps") ?? true);
  129. const patchHistoryConfig = {
  130. enabled: true,
  131. patchHistoryDisabled: true,
  132. eventCreated: `${modelName}.created`,
  133. eventUpdated: `${modelName}.updated`,
  134. eventDeleted: `${modelName}.deleted`,
  135. ...(schema.get("patchHistory") ?? {})
  136. };
  137. schema.set("patchHistory", patchHistoryConfig);
  138. if (patchHistoryConfig.enabled) {
  139. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  140. }
  141. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  142. if (getDataEnabled) schema.plugin(getDataPlugin);
  143. await this._registerEvents(modelName, schema);
  144. await this._registerEventListeners(schema);
  145. schema.set("toObject", { getters: true, virtuals: true });
  146. schema.set("toJSON", { getters: true, virtuals: true });
  147. schema.virtual("_name").get(() => modelName);
  148. schema.plugin(updateVersioningPlugin);
  149. await forEachIn(
  150. Object.entries(schema.paths).filter(
  151. ([, type]) =>
  152. type instanceof SchemaTypes.ObjectId ||
  153. (type instanceof SchemaTypes.Array &&
  154. type.caster instanceof SchemaTypes.ObjectId)
  155. ),
  156. async ([key, type]) => {
  157. const { ref } =
  158. (type instanceof SchemaTypes.Array
  159. ? type.caster?.options
  160. : type?.options) ?? {};
  161. if (ref)
  162. schema.path(key).get((value: any) => {
  163. if (
  164. typeof value === "object" &&
  165. type instanceof SchemaTypes.ObjectId
  166. )
  167. return {
  168. _id: value,
  169. _name: ref
  170. };
  171. if (
  172. Array.isArray(value) &&
  173. type instanceof SchemaTypes.Array
  174. )
  175. return value.map(item =>
  176. item === null
  177. ? null
  178. : {
  179. _id: item,
  180. _name: ref
  181. }
  182. );
  183. return value;
  184. });
  185. }
  186. );
  187. return this._mongoConnection.model(modelName.toString(), schema);
  188. }
  189. /**
  190. * loadModels - Load and initialize all models
  191. *
  192. * @returns Promise
  193. */
  194. private async _loadModels() {
  195. mongoose.SchemaTypes.String.set("trim", true);
  196. this._models = {
  197. abc: await this._loadModel("abc"),
  198. minifiedUsers: await this._loadModel("minifiedUsers"),
  199. news: await this._loadModel("news"),
  200. sessions: await this._loadModel("sessions"),
  201. stations: await this._loadModel("stations"),
  202. users: await this._loadModel("users")
  203. };
  204. }
  205. /**
  206. * syncModelIndexes - Sync indexes for all models
  207. */
  208. private async _syncModelIndexes() {
  209. if (!this._models) throw new Error("Models not loaded");
  210. await forEachIn(
  211. Object.values(this._models).filter(
  212. model => model.schema.get("autoIndex") !== false
  213. ),
  214. model => model.syncIndexes()
  215. );
  216. }
  217. /**
  218. * getModel - Get model
  219. *
  220. * @returns Model
  221. */
  222. public async getModel<ModelType extends Model<any>>(
  223. name: string
  224. ): Promise<ModelType> {
  225. if (!this._models) throw new Error("Models not loaded");
  226. if (this.getStatus() !== ModuleStatus.STARTED)
  227. throw new Error("Module not started");
  228. if (!this._models[name]) throw new Error("Model not found");
  229. return this._models[name] as ModelType;
  230. }
  231. private async _loadModelMigrations(modelName: string) {
  232. if (!this._mongoConnection) throw new Error("Mongo is not available");
  233. let migrations;
  234. try {
  235. migrations = await readdir(
  236. path.resolve(
  237. __dirname,
  238. `./DataModule/models/${modelName}/migrations/`
  239. )
  240. );
  241. } catch (error) {
  242. if (
  243. error instanceof Error &&
  244. "code" in error &&
  245. error.code === "ENOENT"
  246. )
  247. return [];
  248. throw error;
  249. }
  250. return forEachIn(migrations, async migrationFile => {
  251. const { default: Migrate }: { default: typeof Migration } =
  252. await import(
  253. `./DataModule/models/${modelName}/migrations/${migrationFile}`
  254. );
  255. return new Migrate(this._mongoConnection as Connection);
  256. });
  257. }
  258. private async _loadMigrations() {
  259. const models = await readdir(
  260. path.resolve(__dirname, "./DataModule/models/")
  261. );
  262. return forEachIn(models, async modelName =>
  263. this._loadModelMigrations(modelName)
  264. );
  265. }
  266. private async _runMigrations() {
  267. const migrations = (await this._loadMigrations()).flat();
  268. for (let i = 0; i < migrations.length; i += 1) {
  269. const migration = migrations[i];
  270. // eslint-disable-next-line no-await-in-loop
  271. await migration.up();
  272. }
  273. }
  274. private async _loadModelJobs() {
  275. if (!this._models) throw new Error("Models not loaded");
  276. await forEachIn(Object.keys(this._models), async modelName => {
  277. let jobs;
  278. try {
  279. jobs = await readdir(
  280. path.resolve(
  281. __dirname,
  282. `./${this.constructor.name}/models/${modelName}/jobs/`
  283. )
  284. );
  285. } catch (error) {
  286. if (
  287. error instanceof Error &&
  288. "code" in error &&
  289. error.code === "ENOENT"
  290. )
  291. return;
  292. throw error;
  293. }
  294. await forEachIn(jobs, async jobFile => {
  295. const { default: Job } = await import(
  296. `./${this.constructor.name}/models/${modelName}/jobs/${jobFile}`
  297. );
  298. this._jobs[Job.getName()] = Job;
  299. });
  300. });
  301. }
  302. }
  303. export default new DataModule();