DataModule.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. import config from "config";
  2. // import { createClient, RedisClientType } from "redis";
  3. import mongoose, {
  4. Connection,
  5. isObjectIdOrHexString,
  6. MongooseDefaultQueryMiddleware,
  7. MongooseDistinctQueryMiddleware,
  8. MongooseQueryOrDocumentMiddleware
  9. } from "mongoose";
  10. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  11. import { readdir } from "fs/promises";
  12. import path from "path";
  13. import JobContext from "../JobContext";
  14. import BaseModule, { ModuleStatus } from "../BaseModule";
  15. import { UniqueMethods } from "../types/Modules";
  16. import { AnyModel, Models } from "../types/Models";
  17. import { Schemas } from "../types/Schemas";
  18. import documentVersionPlugin from "../schemas/plugins/documentVersion";
  19. import getDataPlugin from "../schemas/plugins/getData";
  20. import Migration from "../Migration";
  21. /**
  22. * Experimental: function to get all nested keys from a MongoDB query object
  23. */
  24. function getAllKeys(obj: object) {
  25. const keys: string[] = [];
  26. function processObject(obj: object, parentKey = "") {
  27. let returnChanged = false;
  28. // eslint-disable-next-line
  29. for (let key in obj) {
  30. // eslint-disable-next-line
  31. if (obj.hasOwnProperty(key)) {
  32. if (key.startsWith("$")) {
  33. // eslint-disable-next-line
  34. // @ts-ignore
  35. // eslint-disable-next-line
  36. processNestedObject(obj[key], parentKey); // Process nested keys without including the current key
  37. // eslint-disable-next-line
  38. continue; // Skip the current key
  39. }
  40. const currentKey = parentKey ? `${parentKey}.${key}` : key;
  41. // eslint-disable-next-line
  42. // @ts-ignore
  43. if (typeof obj[key] === "object" && obj[key] !== null) {
  44. // eslint-disable-next-line
  45. // @ts-ignore
  46. if (Array.isArray(obj[key])) {
  47. // eslint-disable-next-line
  48. // @ts-ignore
  49. // eslint-disable-next-line
  50. if (processArray(obj[key], currentKey)) {
  51. returnChanged = true;
  52. // eslint-disable-next-line
  53. continue;
  54. }
  55. }
  56. // eslint-disable-next-line
  57. // @ts-ignore
  58. else if (processObject(obj[key], currentKey)) {
  59. returnChanged = true;
  60. // eslint-disable-next-line
  61. continue;
  62. }
  63. }
  64. keys.push(currentKey);
  65. returnChanged = true;
  66. }
  67. }
  68. return returnChanged;
  69. }
  70. function processArray(arr: Array<any>, parentKey: string) {
  71. let returnChanged = false;
  72. for (let i = 0; i < arr.length; i += 1) {
  73. const currentKey = parentKey;
  74. if (typeof arr[i] === "object" && arr[i] !== null) {
  75. if (Array.isArray(arr[i])) {
  76. if (processArray(arr[i], currentKey)) returnChanged = true;
  77. } else if (processObject(arr[i], currentKey))
  78. returnChanged = true;
  79. }
  80. }
  81. return returnChanged;
  82. }
  83. function processNestedObject(obj: object, parentKey: string) {
  84. if (typeof obj === "object" && obj !== null) {
  85. if (Array.isArray(obj)) {
  86. processArray(obj, parentKey);
  87. } else {
  88. processObject(obj, parentKey);
  89. }
  90. }
  91. }
  92. processObject(obj);
  93. return keys;
  94. }
  95. export default class DataModule extends BaseModule {
  96. private _models?: Models;
  97. private _mongoConnection?: Connection;
  98. // private _redisClient?: RedisClientType;
  99. /**
  100. * Data Module
  101. */
  102. public constructor() {
  103. super("data");
  104. this._dependentModules = ["events"];
  105. this._jobConfig = {
  106. getModel: false
  107. };
  108. }
  109. /**
  110. * startup - Startup data module
  111. */
  112. public override async startup() {
  113. await super.startup();
  114. await this._createMongoConnection();
  115. await this._runMigrations();
  116. await this._loadModels();
  117. await this._syncModelIndexes();
  118. await this._defineModelJobs();
  119. // @ts-ignore
  120. // this._redisClient = createClient({ ...config.get("redis") });
  121. //
  122. // await this._redisClient.connect();
  123. //
  124. // const redisConfigResponse = await this._redisClient.sendCommand([
  125. // "CONFIG",
  126. // "GET",
  127. // "notify-keyspace-events"
  128. // ]);
  129. //
  130. // if (
  131. // !(
  132. // Array.isArray(redisConfigResponse) &&
  133. // redisConfigResponse[1] === "xE"
  134. // )
  135. // )
  136. // throw new Error(
  137. // `notify-keyspace-events is NOT configured correctly! It is set to: ${
  138. // (Array.isArray(redisConfigResponse) &&
  139. // redisConfigResponse[1]) ||
  140. // "unknown"
  141. // }`
  142. // );
  143. await super._started();
  144. }
  145. /**
  146. * shutdown - Shutdown data module
  147. */
  148. public override async shutdown() {
  149. await super.shutdown();
  150. // if (this._redisClient) await this._redisClient.quit();
  151. patchEventEmitter.removeAllListeners();
  152. if (this._mongoConnection) await this._mongoConnection.close();
  153. await this._stopped();
  154. }
  155. /**
  156. * createMongoConnection - Create mongo connection
  157. */
  158. private async _createMongoConnection() {
  159. mongoose.set({
  160. runValidators: true,
  161. sanitizeFilter: true,
  162. strict: "throw",
  163. strictQuery: "throw"
  164. });
  165. const { user, password, host, port, database } = config.get<{
  166. user: string;
  167. password: string;
  168. host: string;
  169. port: number;
  170. database: string;
  171. }>("mongo");
  172. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  173. this._mongoConnection = await mongoose
  174. .createConnection(mongoUrl)
  175. .asPromise();
  176. }
  177. /**
  178. * registerEvents - Register events for schema with event module
  179. */
  180. private async _registerEvents<
  181. ModelName extends keyof Models,
  182. SchemaType extends Schemas[keyof ModelName]
  183. >(modelName: ModelName, schema: SchemaType) {
  184. const methods: string[] = [
  185. "aggregate",
  186. "count",
  187. "countDocuments",
  188. "deleteOne",
  189. "deleteMany",
  190. "estimatedDocumentCount",
  191. "find",
  192. "findOne",
  193. "findOneAndDelete",
  194. "findOneAndRemove",
  195. "findOneAndReplace",
  196. "findOneAndUpdate",
  197. // "init",
  198. "insertMany",
  199. "remove",
  200. "replaceOne",
  201. "save",
  202. "update",
  203. "updateOne",
  204. "updateMany"
  205. // "validate"
  206. ];
  207. methods.forEach(method => {
  208. // NOTE: some Mongo selectors may also search through linked documents. Prevent that
  209. schema.pre(method, async function () {
  210. console.log(`Pre-${method}! START`);
  211. if (
  212. this.options?.userContext &&
  213. ["find", "update", "deleteOne", "save"].indexOf(method) ===
  214. -1
  215. )
  216. throw new Error("Method not allowed");
  217. console.log(`Pre-${method}!`, this.options?.userContext);
  218. if (["find", "update", "deleteOne"].indexOf(method) !== -1) {
  219. const filter = this.getFilter();
  220. const filterKeys = getAllKeys(filter);
  221. filterKeys.forEach(filterKey => {
  222. const splitFilterKeys = filterKey
  223. .split(".")
  224. .reduce(
  225. (keys: string[], key: string) =>
  226. keys.length > 0
  227. ? [
  228. ...keys,
  229. `${
  230. keys[keys.length - 1]
  231. }.${key}`
  232. ]
  233. : [key],
  234. []
  235. );
  236. splitFilterKeys.forEach(splitFilterKey => {
  237. const path = this.schema.path(splitFilterKey);
  238. if (!path) {
  239. throw new Error(
  240. "Attempted to query with non-existant property"
  241. );
  242. }
  243. if (path.options.restricted) {
  244. throw new Error(
  245. "Attempted to query with restricted property"
  246. );
  247. }
  248. });
  249. });
  250. console.log(`Pre-${method}!`, filterKeys);
  251. // Here we want to always exclude some properties depending on the model, like passwords/tokens
  252. this.projection({ restrictedName: 0 });
  253. }
  254. console.log(`Pre-${method}! END`);
  255. });
  256. schema.post(method, async function (docOrDocs) {
  257. console.log(`Post-${method} START!`);
  258. console.log(`Post-${method}!`, docOrDocs);
  259. console.log(`Post-${method}!`, this);
  260. console.log(`Post-${method} END!`);
  261. });
  262. });
  263. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  264. schema.get("patchHistory") ?? {};
  265. if (!enabled) return;
  266. Object.entries({
  267. created: eventCreated,
  268. updated: eventUpdated,
  269. deleted: eventDeleted
  270. })
  271. .filter(([, event]) => !!event)
  272. .forEach(([action, event]) => {
  273. patchEventEmitter.on(event, async ({ doc }) => {
  274. await this._jobQueue.runJob("events", "publish", {
  275. channel: `model.${modelName}.${doc._id}.${action}`,
  276. value: doc
  277. });
  278. });
  279. });
  280. }
  281. /**
  282. * loadModel - Import and load model schema
  283. *
  284. * @param modelName - Name of the model
  285. * @returns Model
  286. */
  287. private async _loadModel<ModelName extends keyof Models>(
  288. modelName: ModelName
  289. ): Promise<Models[ModelName]> {
  290. if (!this._mongoConnection) throw new Error("Mongo is not available");
  291. const { schema }: { schema: Schemas[ModelName] } = await import(
  292. `../schemas/${modelName.toString()}`
  293. );
  294. schema.plugin(documentVersionPlugin);
  295. schema.set("timestamps", schema.get("timestamps") ?? true);
  296. const patchHistoryConfig = {
  297. enabled: true,
  298. patchHistoryDisabled: true,
  299. eventCreated: `${modelName}.created`,
  300. eventUpdated: `${modelName}.updated`,
  301. eventDeleted: `${modelName}.deleted`,
  302. ...(schema.get("patchHistory") ?? {})
  303. };
  304. schema.set("patchHistory", patchHistoryConfig);
  305. if (patchHistoryConfig.enabled) {
  306. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  307. }
  308. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  309. if (getDataEnabled) schema.plugin(getDataPlugin);
  310. await this._registerEvents(modelName, schema);
  311. return this._mongoConnection.model(modelName.toString(), schema);
  312. }
  313. /**
  314. * loadModels - Load and initialize all models
  315. *
  316. * @returns Promise
  317. */
  318. private async _loadModels() {
  319. mongoose.SchemaTypes.String.set("trim", true);
  320. this._models = {
  321. abc: await this._loadModel("abc"),
  322. news: await this._loadModel("news"),
  323. session: await this._loadModel("session"),
  324. station: await this._loadModel("station"),
  325. user: await this._loadModel("user")
  326. };
  327. }
  328. /**
  329. * syncModelIndexes - Sync indexes for all models
  330. */
  331. private async _syncModelIndexes() {
  332. if (!this._models) throw new Error("Models not loaded");
  333. await Promise.all(
  334. Object.values(this._models).map(model => model.syncIndexes())
  335. );
  336. }
  337. /**
  338. * getModel - Get model
  339. *
  340. * @returns Model
  341. */
  342. public async getModel<ModelName extends keyof Models>(
  343. jobContext: JobContext,
  344. payload: ModelName | { name: ModelName }
  345. ) {
  346. if (!this._models) throw new Error("Models not loaded");
  347. if (this.getStatus() !== ModuleStatus.STARTED)
  348. throw new Error("Module not started");
  349. const name = typeof payload === "object" ? payload.name : payload;
  350. return this._models[name];
  351. }
  352. private async _loadMigrations() {
  353. if (!this._mongoConnection) throw new Error("Mongo is not available");
  354. const migrations = await readdir(
  355. path.resolve(__dirname, "../schemas/migrations/")
  356. );
  357. return Promise.all(
  358. migrations.map(async migrationFile => {
  359. const { default: Migrate }: { default: typeof Migration } =
  360. await import(`../schemas/migrations/${migrationFile}`);
  361. return new Migrate(this._mongoConnection as Connection);
  362. })
  363. );
  364. }
  365. private async _runMigrations() {
  366. const migrations = await this._loadMigrations();
  367. for (let i = 0; i < migrations.length; i += 1) {
  368. const migration = migrations[i];
  369. // eslint-disable-next-line no-await-in-loop
  370. await migration.up();
  371. }
  372. }
  373. private async _defineModelJobs() {
  374. if (!this._models) throw new Error("Models not loaded");
  375. await Promise.all(
  376. Object.entries(this._models).map(async ([modelName, model]) => {
  377. await Promise.all(
  378. ["create", "findById", "updateById", "deleteById"].map(
  379. async method => {
  380. this._jobConfig[`${modelName}.${method}`] = {
  381. method: async (context, payload) =>
  382. Object.getPrototypeOf(this)[`_${method}`](
  383. context,
  384. {
  385. ...payload,
  386. modelName,
  387. model
  388. }
  389. )
  390. };
  391. }
  392. )
  393. );
  394. const jobConfig = model.schema.get("jobConfig");
  395. if (
  396. typeof jobConfig === "object" &&
  397. Object.keys(jobConfig).length > 0
  398. )
  399. await Promise.all(
  400. Object.entries(jobConfig).map(
  401. async ([name, options]) => {
  402. if (options === "disabled") {
  403. if (this._jobConfig[`${modelName}.${name}`])
  404. delete this._jobConfig[
  405. `${modelName}.${name}`
  406. ];
  407. return;
  408. }
  409. let api = this._jobApiDefault;
  410. let method;
  411. const configOptions =
  412. this._jobConfig[`${modelName}.${name}`];
  413. if (typeof configOptions === "object") {
  414. if (typeof configOptions.api === "boolean")
  415. api = configOptions.api;
  416. if (
  417. typeof configOptions.method ===
  418. "function"
  419. )
  420. method = configOptions.method;
  421. } else if (typeof configOptions === "function")
  422. method = configOptions;
  423. else if (typeof configOptions === "boolean")
  424. api = configOptions;
  425. if (
  426. typeof options === "object" &&
  427. typeof options.api === "boolean"
  428. )
  429. api = options.api;
  430. else if (typeof options === "boolean")
  431. api = options;
  432. if (
  433. typeof options === "object" &&
  434. typeof options.method === "function"
  435. )
  436. method = async (...args) =>
  437. options.method.apply(model, args);
  438. else if (typeof options === "function")
  439. method = async (...args) =>
  440. options.apply(model, args);
  441. if (typeof method !== "function")
  442. throw new Error(
  443. `Job "${name}" has no function method defined`
  444. );
  445. this._jobConfig[`${modelName}.${name}`] = {
  446. api,
  447. method
  448. };
  449. }
  450. )
  451. );
  452. })
  453. );
  454. }
  455. private async _findById(
  456. context: JobContext,
  457. payload: {
  458. modelName: keyof Models;
  459. model: AnyModel;
  460. _id: Types.ObjectId;
  461. }
  462. ) {
  463. const { modelName, model, _id } = payload ?? {};
  464. await context.assertPermission(`data.${modelName}.findById.${_id}`);
  465. const query = model.findById(_id);
  466. return query.exec();
  467. }
  468. private async _create(
  469. context: JobContext,
  470. payload: {
  471. modelName: keyof Models;
  472. model: AnyModel;
  473. query: Record<string, any[]>;
  474. }
  475. ) {
  476. const { modelName, model, query } = payload ?? {};
  477. await context.assertPermission(`data.${modelName}.create`);
  478. if (typeof query !== "object")
  479. throw new Error("Query is not an object");
  480. if (Object.keys(query).length === 0)
  481. throw new Error("Empty query object provided");
  482. if (model.schema.path("createdBy"))
  483. query.createdBy = (await context.getUser())._id;
  484. return model.create(query);
  485. }
  486. private async _updateById(
  487. context: JobContext,
  488. payload: {
  489. modelName: keyof Models;
  490. model: AnyModel;
  491. _id: Types.ObjectId;
  492. query: Record<string, any[]>;
  493. }
  494. ) {
  495. const { modelName, model, _id, query } = payload ?? {};
  496. await context.assertPermission(`data.${modelName}.updateById.${_id}`);
  497. if (!isObjectIdOrHexString(_id))
  498. throw new Error("_id is not an ObjectId");
  499. if (typeof query !== "object")
  500. throw new Error("Query is not an object");
  501. if (Object.keys(query).length === 0)
  502. throw new Error("Empty query object provided");
  503. return model.updateOne({ _id }, { $set: query });
  504. }
  505. private async _deleteById(
  506. context: JobContext,
  507. payload: {
  508. modelName: keyof Models;
  509. model: AnyModel;
  510. _id: Types.ObjectId;
  511. }
  512. ) {
  513. const { modelName, model, _id } = payload ?? {};
  514. await context.assertPermission(`data.${modelName}.deleteById.${_id}`);
  515. if (!isObjectIdOrHexString(_id))
  516. throw new Error("_id is not an ObjectId");
  517. return model.deleteOne({ _id });
  518. }
  519. }
  520. export type DataModuleJobs = {
  521. [Property in keyof UniqueMethods<DataModule>]: {
  522. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  523. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  524. };
  525. };