DataModule.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. import config from "config";
  2. import mongoose, {
  3. Connection,
  4. isObjectIdOrHexString,
  5. SchemaTypes,
  6. Types
  7. } from "mongoose";
  8. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  9. import { readdir } from "fs/promises";
  10. import path from "path";
  11. import updateVersioningPlugin from "mongoose-update-versioning";
  12. import documentVersionPlugin from "@/models/plugins/documentVersion";
  13. import getDataPlugin from "@/models/plugins/getData";
  14. import Migration from "@/models/Migration";
  15. import JobContext from "@/JobContext";
  16. import BaseModule, { ModuleStatus } from "@/BaseModule";
  17. import { UniqueMethods } from "@/types/Modules";
  18. import { AnyModel, Models } from "@/types/Models";
  19. import { Schemas } from "@/types/Schemas";
  20. import EventsModule from "./EventsModule";
  21. export class DataModule extends BaseModule {
  22. private _models?: Models;
  23. private _mongoConnection?: Connection;
  24. /**
  25. * Data Module
  26. */
  27. public constructor() {
  28. super("data");
  29. this._dependentModules = ["events"];
  30. this._jobConfig = {
  31. getModel: "disabled"
  32. };
  33. }
  34. /**
  35. * startup - Startup data module
  36. */
  37. public override async startup() {
  38. await super.startup();
  39. await this._createMongoConnection();
  40. await this._runMigrations();
  41. await this._loadModels();
  42. await this._syncModelIndexes();
  43. await this._defineModelJobs();
  44. await super._started();
  45. }
  46. /**
  47. * shutdown - Shutdown data module
  48. */
  49. public override async shutdown() {
  50. await super.shutdown();
  51. patchEventEmitter.removeAllListeners();
  52. if (this._mongoConnection) await this._mongoConnection.close();
  53. await this._stopped();
  54. }
  55. /**
  56. * createMongoConnection - Create mongo connection
  57. */
  58. private async _createMongoConnection() {
  59. mongoose.set({
  60. runValidators: true,
  61. sanitizeFilter: true,
  62. strict: "throw",
  63. strictQuery: "throw"
  64. });
  65. const { user, password, host, port, database } = config.get<{
  66. user: string;
  67. password: string;
  68. host: string;
  69. port: number;
  70. database: string;
  71. }>("mongo");
  72. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  73. this._mongoConnection = await mongoose
  74. .createConnection(mongoUrl)
  75. .asPromise();
  76. }
  77. /**
  78. * registerEvents - Register events for schema with event module
  79. */
  80. private async _registerEvents<
  81. ModelName extends keyof Models,
  82. SchemaType extends Schemas[keyof ModelName]
  83. >(modelName: ModelName, schema: SchemaType) {
  84. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  85. schema.get("patchHistory") ?? {};
  86. if (!enabled) return;
  87. Object.entries({
  88. created: eventCreated,
  89. updated: eventUpdated,
  90. deleted: eventDeleted
  91. })
  92. .filter(([, event]) => !!event)
  93. .forEach(([action, event]) => {
  94. patchEventEmitter.on(event, async ({ doc, oldDoc }) => {
  95. const modelId = doc?._id ?? oldDoc?._id;
  96. const Model = await this.getModel(modelName);
  97. if (doc) doc = Model.hydrate(doc);
  98. if (oldDoc) oldDoc = Model.hydrate(oldDoc);
  99. if (!modelId && action !== "created")
  100. throw new Error(`Model Id not found for "${event}"`);
  101. let channel = `model.${modelName}.${action}`;
  102. if (action !== "created") channel += `.${modelId}`;
  103. await EventsModule.publish(channel, { doc, oldDoc });
  104. });
  105. });
  106. }
  107. /**
  108. * loadModel - Import and load model schema
  109. *
  110. * @param modelName - Name of the model
  111. * @returns Model
  112. */
  113. private async _loadModel<ModelName extends keyof Models>(
  114. modelName: ModelName
  115. ): Promise<Models[ModelName]> {
  116. if (!this._mongoConnection) throw new Error("Mongo is not available");
  117. const { schema }: { schema: Schemas[ModelName] } = await import(
  118. `../models/schemas/${modelName.toString()}/schema`
  119. );
  120. schema.plugin(documentVersionPlugin);
  121. schema.set("timestamps", schema.get("timestamps") ?? true);
  122. const patchHistoryConfig = {
  123. enabled: true,
  124. patchHistoryDisabled: true,
  125. eventCreated: `${modelName}.created`,
  126. eventUpdated: `${modelName}.updated`,
  127. eventDeleted: `${modelName}.deleted`,
  128. ...(schema.get("patchHistory") ?? {})
  129. };
  130. schema.set("patchHistory", patchHistoryConfig);
  131. if (patchHistoryConfig.enabled) {
  132. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  133. }
  134. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  135. if (getDataEnabled) schema.plugin(getDataPlugin);
  136. await this._registerEvents(modelName, schema);
  137. schema.set("toObject", { getters: true, virtuals: true });
  138. schema.set("toJSON", { getters: true, virtuals: true });
  139. schema.virtual("_name").get(() => modelName);
  140. schema.plugin(updateVersioningPlugin);
  141. await Promise.all(
  142. Object.entries(schema.paths)
  143. .filter(
  144. ([, type]) =>
  145. type instanceof SchemaTypes.ObjectId ||
  146. (type instanceof SchemaTypes.Array &&
  147. type.caster instanceof SchemaTypes.ObjectId)
  148. )
  149. .map(async ([key, type]) => {
  150. const { ref } =
  151. (type instanceof SchemaTypes.ObjectId
  152. ? type?.options
  153. : type.caster?.options) ?? {};
  154. if (ref)
  155. schema.path(key).get(value => {
  156. if (
  157. typeof value === "object" &&
  158. type instanceof SchemaTypes.ObjectId
  159. )
  160. return {
  161. _id: value,
  162. _name: ref
  163. };
  164. if (
  165. Array.isArray(value) &&
  166. type instanceof SchemaTypes.Array
  167. )
  168. return value.map(item =>
  169. item === null
  170. ? null
  171. : {
  172. _id: item,
  173. _name: ref
  174. }
  175. );
  176. return value;
  177. });
  178. })
  179. );
  180. return this._mongoConnection.model(modelName.toString(), schema);
  181. }
  182. /**
  183. * loadModels - Load and initialize all models
  184. *
  185. * @returns Promise
  186. */
  187. private async _loadModels() {
  188. mongoose.SchemaTypes.String.set("trim", true);
  189. this._models = {
  190. abc: await this._loadModel("abc"),
  191. news: await this._loadModel("news"),
  192. sessions: await this._loadModel("sessions"),
  193. stations: await this._loadModel("stations"),
  194. users: await this._loadModel("users")
  195. };
  196. }
  197. /**
  198. * syncModelIndexes - Sync indexes for all models
  199. */
  200. private async _syncModelIndexes() {
  201. if (!this._models) throw new Error("Models not loaded");
  202. await Promise.all(
  203. Object.values(this._models).map(model => model.syncIndexes())
  204. );
  205. }
  206. /**
  207. * getModel - Get model
  208. *
  209. * @returns Model
  210. */
  211. public async getModel<ModelName extends keyof Models>(name: ModelName) {
  212. if (!this._models) throw new Error("Models not loaded");
  213. if (this.getStatus() !== ModuleStatus.STARTED)
  214. throw new Error("Module not started");
  215. return this._models[name];
  216. }
  217. private async _loadMigrations() {
  218. if (!this._mongoConnection) throw new Error("Mongo is not available");
  219. const migrations = await readdir(
  220. path.resolve(__dirname, "../models/migrations/")
  221. );
  222. return Promise.all(
  223. migrations.map(async migrationFile => {
  224. const { default: Migrate }: { default: typeof Migration } =
  225. await import(`../models/migrations/${migrationFile}`);
  226. return new Migrate(this._mongoConnection as Connection);
  227. })
  228. );
  229. }
  230. private async _runMigrations() {
  231. const migrations = await this._loadMigrations();
  232. for (let i = 0; i < migrations.length; i += 1) {
  233. const migration = migrations[i];
  234. // eslint-disable-next-line no-await-in-loop
  235. await migration.up();
  236. }
  237. }
  238. private async _defineModelJobs() {
  239. if (!this._models) throw new Error("Models not loaded");
  240. await Promise.all(
  241. Object.entries(this._models).map(async ([modelName, model]) => {
  242. await Promise.all(
  243. ["create", "findById", "updateById", "deleteById"].map(
  244. async method => {
  245. this._jobConfig[`${modelName}.${method}`] = {
  246. method: async (context, payload) =>
  247. Object.getPrototypeOf(this)[`_${method}`](
  248. context,
  249. {
  250. ...payload,
  251. modelName,
  252. model
  253. }
  254. )
  255. };
  256. }
  257. )
  258. );
  259. const jobConfig = model.schema.get("jobConfig");
  260. if (
  261. typeof jobConfig === "object" &&
  262. Object.keys(jobConfig).length > 0
  263. )
  264. await Promise.all(
  265. Object.entries(jobConfig).map(
  266. async ([name, options]) => {
  267. if (options === "disabled") {
  268. if (this._jobConfig[`${modelName}.${name}`])
  269. delete this._jobConfig[
  270. `${modelName}.${name}`
  271. ];
  272. return;
  273. }
  274. let api = this._jobConfigDefault === true;
  275. let method;
  276. const configOptions =
  277. this._jobConfig[`${modelName}.${name}`];
  278. if (typeof configOptions === "object") {
  279. if (typeof configOptions.api === "boolean")
  280. api = configOptions.api;
  281. if (
  282. typeof configOptions.method ===
  283. "function"
  284. )
  285. method = configOptions.method;
  286. } else if (typeof configOptions === "function")
  287. method = configOptions;
  288. else if (typeof configOptions === "boolean")
  289. api = configOptions;
  290. else if (
  291. this._jobConfigDefault === "disabled"
  292. ) {
  293. if (this._jobConfig[`${modelName}.${name}`])
  294. delete this._jobConfig[
  295. `${modelName}.${name}`
  296. ];
  297. return;
  298. }
  299. if (
  300. typeof options === "object" &&
  301. typeof options.api === "boolean"
  302. )
  303. api = options.api;
  304. else if (typeof options === "boolean")
  305. api = options;
  306. if (
  307. typeof options === "object" &&
  308. typeof options.method === "function"
  309. )
  310. method = async (...args) =>
  311. options.method.apply(model, args);
  312. else if (typeof options === "function")
  313. method = async (...args) =>
  314. options.apply(model, args);
  315. if (typeof method !== "function")
  316. throw new Error(
  317. `Job "${name}" has no function method defined`
  318. );
  319. this._jobConfig[`${modelName}.${name}`] = {
  320. api,
  321. method
  322. };
  323. }
  324. )
  325. );
  326. })
  327. );
  328. }
  329. private async _findById(
  330. context: JobContext,
  331. payload: {
  332. modelName: keyof Models;
  333. model: AnyModel;
  334. _id: Types.ObjectId;
  335. }
  336. ) {
  337. const { modelName, model, _id } = payload ?? {};
  338. await context.assertPermission(`data.${modelName}.findById.${_id}`);
  339. const query = model.findById(_id);
  340. return query.exec();
  341. }
  342. private async _create(
  343. context: JobContext,
  344. payload: {
  345. modelName: keyof Models;
  346. model: AnyModel;
  347. query: Record<string, any[]>;
  348. }
  349. ) {
  350. const { modelName, model, query } = payload ?? {};
  351. await context.assertPermission(`data.${modelName}.create`);
  352. if (typeof query !== "object")
  353. throw new Error("Query is not an object");
  354. if (Object.keys(query).length === 0)
  355. throw new Error("Empty query object provided");
  356. if (model.schema.path("createdBy"))
  357. query.createdBy = (await context.getUser())._id;
  358. return model.create(query);
  359. }
  360. private async _updateById(
  361. context: JobContext,
  362. payload: {
  363. modelName: keyof Models;
  364. model: AnyModel;
  365. _id: Types.ObjectId;
  366. query: Record<string, any[]>;
  367. }
  368. ) {
  369. const { modelName, model, _id, query } = payload ?? {};
  370. await context.assertPermission(`data.${modelName}.updateById.${_id}`);
  371. if (!isObjectIdOrHexString(_id))
  372. throw new Error("_id is not an ObjectId");
  373. if (typeof query !== "object")
  374. throw new Error("Query is not an object");
  375. if (Object.keys(query).length === 0)
  376. throw new Error("Empty query object provided");
  377. return model.updateOne({ _id }, { $set: query });
  378. }
  379. private async _deleteById(
  380. context: JobContext,
  381. payload: {
  382. modelName: keyof Models;
  383. model: AnyModel;
  384. _id: Types.ObjectId;
  385. }
  386. ) {
  387. const { modelName, model, _id } = payload ?? {};
  388. await context.assertPermission(`data.${modelName}.deleteById.${_id}`);
  389. if (!isObjectIdOrHexString(_id))
  390. throw new Error("_id is not an ObjectId");
  391. return model.deleteOne({ _id });
  392. }
  393. }
  394. export type DataModuleJobs = {
  395. [Property in keyof UniqueMethods<DataModule>]: {
  396. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  397. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  398. };
  399. };
  400. export default new DataModule();