DataModule.ts 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. import config from "config";
  2. import mongoose, { Connection, Model, Schema, SchemaTypes } from "mongoose";
  3. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  4. import { readdir } from "fs/promises";
  5. import path from "path";
  6. import updateVersioningPlugin from "mongoose-update-versioning";
  7. import { forEachIn } from "@common/utils/forEachIn";
  8. import Migration from "@/modules/DataModule/Migration";
  9. import documentVersionPlugin from "@/modules/DataModule/plugins/documentVersion";
  10. import getDataPlugin from "@/modules/DataModule/plugins/getData";
  11. import BaseModule, { ModuleStatus } from "@/BaseModule";
  12. import EventsModule from "./EventsModule";
  13. import DataModuleJob from "./DataModule/DataModuleJob";
  14. import Job from "@/Job";
  15. export class DataModule extends BaseModule {
  16. private _models?: Record<string, Model<any>>;
  17. private _mongoConnection?: Connection;
  18. declare _jobs: Record<string, typeof Job | typeof DataModuleJob>;
  19. /**
  20. * Data Module
  21. */
  22. public constructor() {
  23. super("data");
  24. this._dependentModules = ["events"];
  25. }
  26. /**
  27. * startup - Startup data module
  28. */
  29. public override async startup() {
  30. await super.startup();
  31. await this._createMongoConnection();
  32. await this._runMigrations();
  33. await this._loadModels();
  34. await this._syncModelIndexes();
  35. await this._loadModelJobs();
  36. await super._started();
  37. }
  38. /**
  39. * shutdown - Shutdown data module
  40. */
  41. public override async shutdown() {
  42. await super.shutdown();
  43. patchEventEmitter.removeAllListeners();
  44. if (this._mongoConnection) await this._mongoConnection.close();
  45. await this._stopped();
  46. }
  47. /**
  48. * createMongoConnection - Create mongo connection
  49. */
  50. private async _createMongoConnection() {
  51. mongoose.set({
  52. runValidators: true,
  53. sanitizeFilter: true,
  54. strict: "throw",
  55. strictQuery: "throw"
  56. });
  57. const { user, password, host, port, database } = config.get<{
  58. user: string;
  59. password: string;
  60. host: string;
  61. port: number;
  62. database: string;
  63. }>("mongo");
  64. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  65. this._mongoConnection = await mongoose
  66. .createConnection(mongoUrl)
  67. .asPromise();
  68. }
  69. /**
  70. * registerEvents - Register events for schema with event module
  71. */
  72. private async _registerEvents(modelName: string, schema: Schema<any>) {
  73. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  74. schema.get("patchHistory") ?? {};
  75. if (!enabled) return;
  76. Object.entries({
  77. created: eventCreated,
  78. updated: eventUpdated,
  79. deleted: eventDeleted
  80. })
  81. .filter(([, event]) => !!event)
  82. .forEach(([action, event]) => {
  83. patchEventEmitter.on(event!, async ({ doc, oldDoc }) => {
  84. const modelId = doc?._id ?? oldDoc?._id;
  85. const Model = await this.getModel(modelName);
  86. if (doc) doc = Model.hydrate(doc);
  87. if (oldDoc) oldDoc = Model.hydrate(oldDoc);
  88. if (!modelId && action !== "created")
  89. throw new Error(`Model Id not found for "${event}"`);
  90. const channel = `model.${modelName}.${action}`;
  91. await EventsModule.publish(channel, { doc, oldDoc });
  92. if (action !== "created")
  93. await EventsModule.publish(`${channel}.${modelId}`, {
  94. doc,
  95. oldDoc
  96. });
  97. });
  98. });
  99. }
  100. /**
  101. * registerEvents - Register events for schema with event module
  102. */
  103. private async _registerEventListeners(schema: Schema<any>) {
  104. const eventListeners = schema.get("eventListeners");
  105. if (
  106. typeof eventListeners !== "object" ||
  107. Object.keys(eventListeners).length === 0
  108. )
  109. return;
  110. await forEachIn(
  111. Object.entries(eventListeners),
  112. async ([event, callback]) =>
  113. EventsModule.subscribe("event", event, callback)
  114. );
  115. }
  116. /**
  117. * loadModel - Import and load model schema
  118. *
  119. * @param modelName - Name of the model
  120. * @returns Model
  121. */
  122. private async _loadModel(modelName: string): Promise<Model<any>> {
  123. if (!this._mongoConnection) throw new Error("Mongo is not available");
  124. const { schema }: { schema: Schema<any> } = await import(
  125. `./DataModule/models/${modelName.toString()}/schema`
  126. );
  127. schema.plugin(documentVersionPlugin);
  128. schema.set("timestamps", schema.get("timestamps") ?? true);
  129. const patchHistoryConfig = {
  130. enabled: true,
  131. patchHistoryDisabled: true,
  132. eventCreated: `${modelName}.created`,
  133. eventUpdated: `${modelName}.updated`,
  134. eventDeleted: `${modelName}.deleted`,
  135. ...(schema.get("patchHistory") ?? {})
  136. };
  137. schema.set("patchHistory", patchHistoryConfig);
  138. if (patchHistoryConfig.enabled) {
  139. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  140. }
  141. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  142. if (getDataEnabled) schema.plugin(getDataPlugin);
  143. schema.static("getModelName", () => modelName);
  144. await this._registerEvents(modelName, schema);
  145. await this._registerEventListeners(schema);
  146. schema.set("toObject", { getters: true, virtuals: true });
  147. schema.set("toJSON", { getters: true, virtuals: true });
  148. schema.virtual("_name").get(() => modelName);
  149. schema.plugin(updateVersioningPlugin);
  150. await forEachIn(
  151. Object.entries(schema.paths).filter(
  152. ([, type]) =>
  153. type instanceof SchemaTypes.ObjectId ||
  154. (type instanceof SchemaTypes.Array &&
  155. type.caster instanceof SchemaTypes.ObjectId)
  156. ),
  157. async ([key, type]) => {
  158. const { ref } =
  159. (type instanceof SchemaTypes.Array
  160. ? type.caster?.options
  161. : type?.options) ?? {};
  162. if (ref)
  163. schema.path(key).get((value: any) => {
  164. if (
  165. typeof value === "object" &&
  166. type instanceof SchemaTypes.ObjectId
  167. )
  168. return {
  169. _id: value,
  170. _name: ref
  171. };
  172. if (
  173. Array.isArray(value) &&
  174. type instanceof SchemaTypes.Array
  175. )
  176. return value.map(item =>
  177. item === null
  178. ? null
  179. : {
  180. _id: item,
  181. _name: ref
  182. }
  183. );
  184. return value;
  185. });
  186. }
  187. );
  188. return this._mongoConnection.model(modelName.toString(), schema);
  189. }
  190. /**
  191. * loadModels - Load and initialize all models
  192. *
  193. * @returns Promise
  194. */
  195. private async _loadModels() {
  196. mongoose.SchemaTypes.String.set("trim", true);
  197. this._models = {
  198. abc: await this._loadModel("abc"),
  199. minifiedUsers: await this._loadModel("minifiedUsers"),
  200. news: await this._loadModel("news"),
  201. sessions: await this._loadModel("sessions"),
  202. stations: await this._loadModel("stations"),
  203. users: await this._loadModel("users")
  204. };
  205. }
  206. /**
  207. * syncModelIndexes - Sync indexes for all models
  208. */
  209. private async _syncModelIndexes() {
  210. if (!this._models) throw new Error("Models not loaded");
  211. await forEachIn(
  212. Object.values(this._models).filter(
  213. model => model.schema.get("autoIndex") !== false
  214. ),
  215. model => model.syncIndexes()
  216. );
  217. }
  218. /**
  219. * getModel - Get model
  220. *
  221. * @returns Model
  222. */
  223. public async getModel<ModelType extends Model<any>>(
  224. name: string
  225. ): Promise<ModelType> {
  226. if (!this._models) throw new Error("Models not loaded");
  227. if (this.getStatus() !== ModuleStatus.STARTED)
  228. throw new Error("Module not started");
  229. if (!this._models[name]) throw new Error("Model not found");
  230. return this._models[name] as ModelType;
  231. }
  232. private async _loadModelMigrations(modelName: string) {
  233. if (!this._mongoConnection) throw new Error("Mongo is not available");
  234. let migrations;
  235. try {
  236. migrations = await readdir(
  237. path.resolve(
  238. __dirname,
  239. `./DataModule/models/${modelName}/migrations/`
  240. )
  241. );
  242. } catch (error) {
  243. if (
  244. error instanceof Error &&
  245. "code" in error &&
  246. error.code === "ENOENT"
  247. )
  248. return [];
  249. throw error;
  250. }
  251. return forEachIn(migrations, async migrationFile => {
  252. const { default: Migrate }: { default: typeof Migration } =
  253. await import(
  254. `./DataModule/models/${modelName}/migrations/${migrationFile}`
  255. );
  256. return new Migrate(this._mongoConnection as Connection);
  257. });
  258. }
  259. private async _loadMigrations() {
  260. const models = await readdir(
  261. path.resolve(__dirname, "./DataModule/models/")
  262. );
  263. return forEachIn(models, async modelName =>
  264. this._loadModelMigrations(modelName)
  265. );
  266. }
  267. private async _runMigrations() {
  268. const migrations = (await this._loadMigrations()).flat();
  269. for (let i = 0; i < migrations.length; i += 1) {
  270. const migration = migrations[i];
  271. // eslint-disable-next-line no-await-in-loop
  272. await migration.up();
  273. }
  274. }
  275. private async _loadModelJobs() {
  276. if (!this._models) throw new Error("Models not loaded");
  277. await forEachIn(Object.keys(this._models), async modelName => {
  278. let jobs;
  279. try {
  280. jobs = await readdir(
  281. path.resolve(
  282. __dirname,
  283. `./${this.constructor.name}/models/${modelName}/jobs/`
  284. )
  285. );
  286. } catch (error) {
  287. if (
  288. error instanceof Error &&
  289. "code" in error &&
  290. error.code === "ENOENT"
  291. )
  292. return;
  293. throw error;
  294. }
  295. await forEachIn(jobs, async jobFile => {
  296. const { default: Job } = await import(
  297. `./${this.constructor.name}/models/${modelName}/jobs/${jobFile}`
  298. );
  299. this._jobs[Job.getName()] = Job;
  300. });
  301. });
  302. }
  303. }
  304. export default new DataModule();