DataModule.ts 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. import async from "async";
  2. import config from "config";
  3. import mongoose, { Schema } from "mongoose";
  4. import hash from "object-hash";
  5. import { createClient, RedisClientType } from "redis";
  6. import JobContext from "src/JobContext";
  7. import BaseModule from "../BaseModule";
  8. import ModuleManager from "../ModuleManager";
  9. import { UniqueMethods } from "../types/Modules";
  10. import { Collections } from "../types/Collections";
  11. export default class DataModule extends BaseModule {
  12. collections?: Collections;
  13. redis?: RedisClientType;
  14. /**
  15. * Data Module
  16. *
  17. * @param moduleManager - Module manager class
  18. */
  19. public constructor(moduleManager: ModuleManager) {
  20. super(moduleManager, "data");
  21. }
  22. /**
  23. * startup - Startup data module
  24. */
  25. public override startup(): Promise<void> {
  26. return new Promise((resolve, reject) => {
  27. async.waterfall(
  28. [
  29. async () => super.startup(),
  30. async () => {
  31. const mongoUrl = config.get<string>("mongo.url");
  32. return mongoose.connect(mongoUrl);
  33. },
  34. async () => this.loadCollections(),
  35. async () => {
  36. if (this.collections) {
  37. await async.each(
  38. Object.values(this.collections),
  39. async collection =>
  40. collection.model.syncIndexes()
  41. );
  42. } else
  43. throw new Error("Collections have not been loaded");
  44. },
  45. async () => {
  46. const { url, password } = config.get<{
  47. url: string;
  48. password: string;
  49. }>("redis");
  50. this.redis = createClient({
  51. url,
  52. password
  53. });
  54. return this.redis.connect();
  55. },
  56. async () => {
  57. if (!this.redis)
  58. throw new Error("Redis connection not established");
  59. return this.redis.sendCommand([
  60. "CONFIG",
  61. "GET",
  62. "notify-keyspace-events"
  63. ]);
  64. },
  65. async (redisConfigResponse: string[]) => {
  66. if (
  67. !(
  68. Array.isArray(redisConfigResponse) &&
  69. redisConfigResponse[1] === "xE"
  70. )
  71. )
  72. throw new Error(
  73. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  74. (Array.isArray(redisConfigResponse) &&
  75. redisConfigResponse[1]) ||
  76. "unknown"
  77. }`
  78. );
  79. },
  80. async () => super.started()
  81. ],
  82. err => {
  83. if (err) reject(err);
  84. else resolve();
  85. }
  86. );
  87. });
  88. }
  89. /**
  90. * shutdown - Shutdown data module
  91. */
  92. public override shutdown(): Promise<void> {
  93. return new Promise(resolve => {
  94. super
  95. .shutdown()
  96. .then(async () => {
  97. // TODO: Ensure the following shutdown correctly
  98. if (this.redis) await this.redis.quit();
  99. await mongoose.connection.close(false);
  100. })
  101. .finally(() => resolve());
  102. });
  103. }
  104. /**
  105. * loadColllection - Import and load collection schema
  106. *
  107. * @param collectionName - Name of the collection
  108. * @returns Collection
  109. */
  110. private loadCollection<T extends keyof Collections>(
  111. collectionName: T
  112. ): Promise<Collections[T]> {
  113. return new Promise(resolve => {
  114. import(`../collections/${collectionName.toString()}`).then(
  115. ({ schema }: { schema: Collections[T]["schema"] }) => {
  116. const mongoSchema = new Schema<
  117. Collections[T]["schema"]["document"]
  118. >(schema.document, {
  119. timestamps: schema.timestamps
  120. });
  121. const model = mongoose.model(
  122. collectionName.toString(),
  123. mongoSchema
  124. );
  125. // @ts-ignore
  126. resolve({
  127. // @ts-ignore
  128. schema,
  129. // @ts-ignore
  130. model
  131. });
  132. }
  133. );
  134. });
  135. }
  136. /**
  137. * loadCollections - Load and initialize all collections
  138. *
  139. * @returns Promise
  140. */
  141. private loadCollections(): Promise<void> {
  142. return new Promise((resolve, reject) => {
  143. const fetchCollections = async () => ({
  144. abc: await this.loadCollection("abc")
  145. });
  146. fetchCollections()
  147. .then(collections => {
  148. this.collections = collections;
  149. resolve();
  150. })
  151. .catch(err => {
  152. reject(new Error(err));
  153. });
  154. });
  155. }
  156. /**
  157. * Returns the projection array/object that is one level deeper based on the property key
  158. *
  159. * @param projection The projection object/array
  160. * @param key The property key
  161. * @returns Array or Object
  162. */
  163. private getDeeperProjection(projection: any, key: string) {
  164. let deeperProjection;
  165. if (Array.isArray(projection))
  166. deeperProjection = projection
  167. .filter(property => property.startsWith(`${key}.`))
  168. .map(property => property.substr(`${key}.`.length));
  169. else if (typeof projection === "object")
  170. deeperProjection =
  171. projection[key] ??
  172. Object.keys(projection).reduce(
  173. (wipProjection, property) =>
  174. property.startsWith(`${key}.`)
  175. ? {
  176. ...wipProjection,
  177. [property.substr(`${key}.`.length)]:
  178. projection[property]
  179. }
  180. : wipProjection,
  181. {}
  182. );
  183. return deeperProjection;
  184. }
  185. /**
  186. * Whether a property is allowed in a projection array/object
  187. *
  188. * @param projection
  189. * @param property
  190. * @returns
  191. */
  192. private allowedByProjection(projection: any, property: string) {
  193. if (Array.isArray(projection))
  194. return projection.indexOf(property) !== -1;
  195. if (typeof projection === "object") return !!projection[property];
  196. return false;
  197. }
  198. /**
  199. * Strip a document object from any unneeded properties, or of any restricted properties
  200. * If a projection is given
  201. *
  202. * @param document The document object
  203. * @param schema The schema object
  204. * @param projection The projection, which can be null
  205. * @returns
  206. */
  207. private async stripDocument(document: any, schema: any, projection: any) {
  208. // TODO add better comments
  209. // TODO add support for nested objects in arrays
  210. // TODO handle projection excluding properties, rather than assume it's only including properties
  211. const unfilteredEntries = Object.entries(document);
  212. const filteredEntries = await async.reduce(
  213. unfilteredEntries,
  214. [],
  215. async (memo, [key, value]) => {
  216. // If the property does not exist in the schema, return the memo
  217. if (!schema[key]) return memo;
  218. // Handle nested object
  219. if (schema[key].type === undefined) {
  220. // If value is null, it can't be an object, so just return its value
  221. if (!value) return [...memo, [key, value]];
  222. // Get the projection for the next layer
  223. const deeperProjection = this.getDeeperProjection(
  224. projection,
  225. key
  226. );
  227. // Generate a stripped document/object for the current key/value
  228. const strippedDocument = await this.stripDocument(
  229. value,
  230. schema[key],
  231. deeperProjection
  232. );
  233. // If the returned stripped document/object has keys, add the current key with that document/object to the memeo
  234. if (Object.keys(strippedDocument).length > 0)
  235. return [...memo, [key, strippedDocument]];
  236. // The current key has no values that should be returned, so just return the memo
  237. return memo;
  238. }
  239. // If we have a projection, check if the current key is allowed by it. If it is, add the key/value to the memo, otherwise just return the memo
  240. if (projection)
  241. return this.allowedByProjection(projection, key)
  242. ? [...memo, [key, value]]
  243. : memo;
  244. // If the property is restricted, return memo
  245. if (schema[key].restricted) return memo;
  246. // The property exists in the schema, is not explicitly allowed, is not restricted, so add it to memo
  247. return [...memo, [key, value]];
  248. }
  249. );
  250. return Object.fromEntries(filteredEntries);
  251. }
  252. /**
  253. * Parse a projection based on the schema and any given projection
  254. * If no projection is given, it will exclude any restricted properties
  255. * If a projection is given, it will exclude restricted properties that are not explicitly allowed in a projection
  256. * It will return a projection used in Mongo, and if any restricted property is explicitly allowed, return that we can't use the cache
  257. *
  258. * @param schema The schema object
  259. * @param projection The project, which can be null
  260. * @returns
  261. */
  262. private async parseFindProjection(projection: any, schema: any) {
  263. // The mongo projection object we're going to build
  264. const mongoProjection = {};
  265. // This will be false if we let Mongo return any restricted properties
  266. let canCache = true;
  267. // TODO add better comments
  268. // TODO add support for nested objects in arrays
  269. const unfilteredEntries = Object.entries(schema);
  270. await async.forEach(unfilteredEntries, async ([key, value]) => {
  271. // If we have a projection set:
  272. if (projection) {
  273. const allowed = this.allowedByProjection(projection, key);
  274. const { restricted } = value;
  275. // If the property is explicitly allowed in the projection, but also restricted, find can't use cache
  276. if (allowed && restricted) {
  277. canCache = false;
  278. }
  279. // If the property is restricted, but not explicitly allowed, make sure to have mongo exclude it. As it's excluded from Mongo, caching isn't an issue for this property
  280. else if (restricted) {
  281. mongoProjection[key] = false;
  282. }
  283. // If the current property is a nested object
  284. else if (value.type === undefined) {
  285. // Get the projection for the next layer
  286. const deeperProjection = this.getDeeperProjection(
  287. projection,
  288. key
  289. );
  290. // Parse projection for the current value, so one level deeper
  291. const parsedProjection = await this.parseFindProjection(
  292. deeperProjection,
  293. value
  294. );
  295. // If the parsed projection mongo projection contains anything, update our own mongo projection
  296. if (
  297. Object.keys(parsedProjection.mongoProjection).length > 0
  298. )
  299. mongoProjection[key] = parsedProjection.mongoProjection;
  300. // If the parsed projection says we can't use the cache, make sure we can't use cache either
  301. canCache = canCache && parsedProjection.canCache;
  302. }
  303. }
  304. // If we have no projection set, and the current property is restricted, exclude the property from mongo, but don't say we can't use the cache
  305. else if (value.restricted) mongoProjection[key] = false;
  306. // If we have no projection set, and the current property is not restricted, and the current property is a nested object
  307. else if (value.type === undefined) {
  308. // Pass the nested schema object recursively into the parseFindProjection function
  309. const parsedProjection = await this.parseFindProjection(
  310. null,
  311. value
  312. );
  313. // If the returned mongo projection includes anything special, include it in the mongo projection we're returning
  314. if (Object.keys(parsedProjection.mongoProjection).length > 0)
  315. mongoProjection[key] = parsedProjection.mongoProjection;
  316. // Since we're not passing a projection into parseFindProjection, there's no chance that we can't cache
  317. }
  318. });
  319. return {
  320. canCache,
  321. mongoProjection
  322. };
  323. }
  324. /**
  325. * parseFindFilter - Ensure validity of filter and return a mongo filter ---, or the document itself re-constructed
  326. *
  327. * @param filter - Filter
  328. * @param schema - Schema of collection document
  329. * @param options - Parser options
  330. * @returns Promise returning object with query values cast to schema types
  331. * and whether query includes restricted attributes
  332. */
  333. private async parseFindFilter(
  334. filter: any,
  335. schema: any,
  336. options?: {
  337. operators?: boolean;
  338. }
  339. ): Promise<{
  340. mongoFilter: any;
  341. containsRestrictedProperties: boolean;
  342. canCache: boolean;
  343. }> {
  344. if (!filter || typeof filter !== "object")
  345. throw new Error(
  346. "Invalid filter provided. Filter must be an object."
  347. );
  348. const keys = Object.keys(filter);
  349. if (keys.length === 0)
  350. throw new Error(
  351. "Invalid filter provided. Filter must contain keys."
  352. );
  353. // Whether to parse operators or not
  354. const operators = !(options && options.operators === false);
  355. // The MongoDB filter we're building
  356. const mongoFilter: any = {};
  357. // If the filter references any properties that are restricted, this will be true, so that find knows not to cache the query object
  358. let containsRestrictedProperties = false;
  359. // Whether this filter is cachable or not
  360. let canCache = true;
  361. // Operators at the key level that we support right now
  362. const allowedKeyOperators = ["$or", "$and"];
  363. // Operators at the value level that we support right now
  364. const allowedValueOperators = ["$in"];
  365. // Loop through all key/value properties
  366. await async.each(Object.entries(filter), async ([key, value]) => {
  367. // Key must be 1 character and exist
  368. if (!key || key.length === 0)
  369. throw new Error(
  370. `Invalid filter provided. Key must be at least 1 character.`
  371. );
  372. // Handle key operators, which always start with a $
  373. if (operators && key[0] === "$") {
  374. // Operator isn't found, so throw an error
  375. if (allowedKeyOperators.indexOf(key) === -1)
  376. throw new Error(
  377. `Invalid filter provided. Operator "${key}" is not allowed.`
  378. );
  379. // We currently only support $or and $and, but here we can have different logic for different operators
  380. if (key === "$or" || key === "$and") {
  381. // $or and $and should always be an array, so check if it is
  382. if (!Array.isArray(value) || value.length === 0)
  383. throw new Error(
  384. `Key "${key}" must contain array of queries.`
  385. );
  386. // Add the operator to the mongo filter object as an empty array
  387. mongoFilter[key] = [];
  388. // Run parseFindQuery again for child objects and add them to the mongo query operator array
  389. await async.each(value, async _value => {
  390. const {
  391. mongoFilter: _mongoFilter,
  392. containsRestrictedProperties:
  393. _containsRestrictedProperties
  394. } = await this.parseFindFilter(_value, schema, options);
  395. // Actually add the returned filter object to the mongo query we're building
  396. mongoFilter[key].push(_mongoFilter);
  397. if (_containsRestrictedProperties)
  398. containsRestrictedProperties = true;
  399. });
  400. } else
  401. throw new Error(
  402. `Unhandled operator "${key}", this should never happen!`
  403. );
  404. } else {
  405. // Here we handle any normal keys in the query object
  406. // If the key doesn't exist in the schema, throw an error
  407. if (!Object.hasOwn(schema, key))
  408. throw new Error(
  409. `Key "${key} does not exist in the schema."`
  410. );
  411. // If the key in the schema is marked as restricted, containsRestrictedProperties will be true
  412. if (schema[key].restricted) containsRestrictedProperties = true;
  413. // Type will be undefined if it's a nested object
  414. if (schema[key].type === undefined) {
  415. // Run parseFindFilter on the nested schema object
  416. const {
  417. mongoFilter: _mongoFilter,
  418. containsRestrictedProperties:
  419. _containsRestrictedProperties
  420. } = await this.parseFindFilter(value, schema[key], options);
  421. mongoFilter[key] = _mongoFilter;
  422. if (_containsRestrictedProperties)
  423. containsRestrictedProperties = true;
  424. } else if (
  425. operators &&
  426. typeof value === "object" &&
  427. value &&
  428. Object.keys(value).length === 1 &&
  429. Object.keys(value)[0] &&
  430. Object.keys(value)[0][0] === "$"
  431. ) {
  432. // This entire if statement is for handling value operators
  433. const operator = Object.keys(value)[0];
  434. // Operator isn't found, so throw an error
  435. if (allowedValueOperators.indexOf(operator) === -1)
  436. throw new Error(
  437. `Invalid filter provided. Operator "${key}" is not allowed.`
  438. );
  439. // Handle the $in value operator
  440. if (operator === "$in") {
  441. mongoFilter[key] = {
  442. $in: []
  443. };
  444. if (value.$in.length > 0)
  445. mongoFilter[key].$in = await async.map(
  446. value.$in,
  447. async (_value: any) => {
  448. if (
  449. typeof schema[key].type === "function"
  450. ) {
  451. //
  452. // const Type = schema[key].type;
  453. // const castValue = new Type(_value);
  454. // if (schema[key].validate)
  455. // await schema[key]
  456. // .validate(castValue)
  457. // .catch(err => {
  458. // throw new Error(
  459. // `Invalid value for ${key}, ${err}`
  460. // );
  461. // });
  462. return _value;
  463. }
  464. throw new Error(
  465. `Invalid schema type for ${key}`
  466. );
  467. }
  468. );
  469. } else
  470. throw new Error(
  471. `Unhandled operator "${operator}", this should never happen!`
  472. );
  473. } else if (typeof schema[key].type === "function") {
  474. // Do type checking/casting here
  475. // const Type = schema[key].type;
  476. // // const castValue = new Type(value);
  477. // if (schema[key].validate)
  478. // await schema[key].validate(castValue).catch(err => {
  479. // throw new Error(`Invalid value for ${key}, ${err}`);
  480. // });
  481. mongoFilter[key] = value;
  482. } else throw new Error(`Invalid schema type for ${key}`);
  483. }
  484. });
  485. if (containsRestrictedProperties) canCache = false;
  486. return { mongoFilter, containsRestrictedProperties, canCache };
  487. }
  488. // TODO improve caching
  489. // TODO add support for computed fields
  490. // TODO parse query - validation
  491. // TODO add proper typescript support
  492. // TODO add proper jsdoc
  493. // TODO add support for enum document attributes
  494. // TODO add support for array document attributes
  495. // TODO add support for reference document attributes
  496. // TODO fix 2nd layer of schema
  497. /**
  498. * find - Get one or more document(s) from a single collection
  499. *
  500. * @param payload - Payload
  501. * @returns Returned object
  502. */
  503. public find<CollectionNameType extends keyof Collections>(
  504. context: JobContext,
  505. {
  506. collection, // Collection name
  507. filter, // Similar to MongoDB filter
  508. projection,
  509. limit = 0, // TODO have limit off by default?
  510. page = 1,
  511. useCache = true
  512. }: {
  513. collection: CollectionNameType;
  514. filter: Record<string, any>;
  515. projection?: Record<string, any> | string[];
  516. values?: Record<string, any>;
  517. limit?: number;
  518. page?: number;
  519. useCache?: boolean;
  520. }
  521. ): Promise<any | null> {
  522. return new Promise((resolve, reject) => {
  523. let queryHash: string | null = null;
  524. let cacheable = useCache !== false;
  525. let mongoFilter;
  526. let mongoProjection;
  527. async.waterfall(
  528. [
  529. // Verify whether the collection exists
  530. async () => {
  531. if (!collection)
  532. throw new Error("No collection specified");
  533. if (this.collections && !this.collections[collection])
  534. throw new Error("Collection not found");
  535. },
  536. // Verify whether the query is valid-enough to continue
  537. async () => {
  538. const parsedFilter = await this.parseFindFilter(
  539. filter,
  540. this.collections![collection].schema.document
  541. );
  542. cacheable = cacheable && parsedFilter.canCache;
  543. mongoFilter = parsedFilter.mongoFilter;
  544. },
  545. // Verify whether the query is valid-enough to continue
  546. async () => {
  547. const parsedProjection = await this.parseFindProjection(
  548. projection,
  549. this.collections![collection].schema.document
  550. );
  551. console.log(222, parsedProjection);
  552. cacheable = cacheable && parsedProjection.canCache;
  553. mongoProjection = parsedProjection.mongoProjection;
  554. },
  555. // If we can use cache, get from the cache, and if we get results return those
  556. async () => {
  557. // If we're allowed to cache, and the filter doesn't reference any restricted fields, try to cache the query and its response
  558. if (cacheable) {
  559. // Turn the query object into a sha1 hash that can be used as a Redis key
  560. queryHash = hash(
  561. {
  562. collection,
  563. mongoFilter,
  564. limit,
  565. page
  566. },
  567. {
  568. algorithm: "sha1"
  569. }
  570. );
  571. // Check if the query hash already exists in Redis, and get it if it is
  572. const cachedQuery = await this.redis?.GET(
  573. `query.find.${queryHash}`
  574. );
  575. // Return the mongoFilter along with the cachedDocuments, if any
  576. return {
  577. cachedDocuments: cachedQuery
  578. ? JSON.parse(cachedQuery)
  579. : null
  580. };
  581. }
  582. return { cachedDocuments: null };
  583. },
  584. // If we didn't get documents from the cache, get them from mongo
  585. async ({ cachedDocuments }: any) => {
  586. if (cachedDocuments) {
  587. cacheable = false;
  588. return cachedDocuments;
  589. }
  590. // const getFindValues = async (object: any) => {
  591. // const find: any = {};
  592. // await async.each(
  593. // Object.entries(object),
  594. // async ([key, value]) => {
  595. // if (
  596. // value.type === undefined &&
  597. // Object.keys(value).length > 0
  598. // ) {
  599. // const _find = await getFindValues(
  600. // value
  601. // );
  602. // if (Object.keys(_find).length > 0)
  603. // find[key] = _find;
  604. // } else if (!value.restricted)
  605. // find[key] = true;
  606. // }
  607. // );
  608. // return find;
  609. // };
  610. // const find: any = await getFindValues(
  611. // this.collections![collection].schema.document
  612. // );
  613. // TODO, add mongo projection. Make sure to keep in mind caching with queryHash.
  614. return this.collections?.[collection].model
  615. .find(mongoFilter, mongoProjection)
  616. .limit(limit)
  617. .skip((page - 1) * limit);
  618. },
  619. // Convert documents from Mongoose model to regular objects
  620. async (documents: any[]) =>
  621. async.map(documents, async (document: any) =>
  622. document._doc ? document._doc : document
  623. ),
  624. // Add documents to the cache
  625. async (documents: any[]) => {
  626. // Adds query results to cache but doesnt await
  627. if (cacheable && queryHash) {
  628. this.redis!.SET(
  629. `query.find.${queryHash}`,
  630. JSON.stringify(documents),
  631. {
  632. EX: 60
  633. }
  634. );
  635. }
  636. return documents;
  637. },
  638. // Strips the document of any unneeded properties or properties that are restricted
  639. async (documents: any[]) =>
  640. async.map(documents, async (document: any) =>
  641. this.stripDocument(
  642. document,
  643. this.collections![collection].schema.document,
  644. projection
  645. )
  646. )
  647. ],
  648. (err, documents?: any[]) => {
  649. if (err) reject(err);
  650. else if (!documents || documents!.length === 0)
  651. resolve(limit === 1 ? null : []);
  652. else resolve(limit === 1 ? documents![0] : documents);
  653. }
  654. );
  655. });
  656. }
  657. }
  658. export type DataModuleJobs = {
  659. [Property in keyof UniqueMethods<DataModule>]: {
  660. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  661. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  662. };
  663. };