DataModule.ts 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225
  1. import config from "config";
  2. import { Db, MongoClient, ObjectId } from "mongodb";
  3. import { createHash } from "node:crypto";
  4. import { createClient, RedisClientType } from "redis";
  5. import JobContext from "../JobContext";
  6. import BaseModule from "../BaseModule";
  7. import Schema, { Types } from "../Schema";
  8. import { Collections } from "../types/Collections";
  9. import { Document as SchemaDocument } from "../types/Document";
  10. import { UniqueMethods } from "../types/Modules";
  11. import { AttributeValue } from "../types/AttributeValue";
  12. type Entries<T> = {
  13. [K in keyof T]: [K, T[K]];
  14. }[keyof T][];
  15. interface ProjectionObject {
  16. [property: string]: boolean | string[] | ProjectionObject;
  17. }
  18. type Projection = null | undefined | string[] | ProjectionObject;
  19. type NormalizedProjection = {
  20. projection: [string, boolean][];
  21. mode: "includeAllBut" | "excludeAllBut";
  22. };
  23. interface MongoFilter {
  24. [property: string]:
  25. | AttributeValue
  26. | AttributeValue[]
  27. | MongoFilter
  28. | MongoFilter[];
  29. }
  30. interface Document {
  31. [property: string]:
  32. | AttributeValue
  33. | AttributeValue[]
  34. | Document
  35. | Document[];
  36. }
  37. type AllowedRestricted = boolean | string[] | null | undefined;
  38. export default class DataModule extends BaseModule {
  39. private collections?: Collections;
  40. private mongoClient?: MongoClient;
  41. private mongoDb?: Db;
  42. private redisClient?: RedisClientType;
  43. /**
  44. * Data Module
  45. */
  46. public constructor() {
  47. super("data");
  48. }
  49. /**
  50. * startup - Startup data module
  51. */
  52. public override async startup() {
  53. await super.startup();
  54. const mongoUrl = config.get<string>("mongo.url");
  55. this.mongoClient = new MongoClient(mongoUrl);
  56. await this.mongoClient.connect();
  57. this.mongoDb = this.mongoClient.db();
  58. await this.loadCollections();
  59. const { url } = config.get<{ url: string }>("redis");
  60. this.redisClient = createClient({ url });
  61. await this.redisClient.connect();
  62. const redisConfigResponse = await this.redisClient.sendCommand([
  63. "CONFIG",
  64. "GET",
  65. "notify-keyspace-events"
  66. ]);
  67. if (
  68. !(
  69. Array.isArray(redisConfigResponse) &&
  70. redisConfigResponse[1] === "xE"
  71. )
  72. )
  73. throw new Error(
  74. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  75. (Array.isArray(redisConfigResponse) &&
  76. redisConfigResponse[1]) ||
  77. "unknown"
  78. }`
  79. );
  80. await super.started();
  81. }
  82. /**
  83. * shutdown - Shutdown data module
  84. */
  85. public override async shutdown() {
  86. await super.shutdown();
  87. if (this.redisClient) await this.redisClient.quit();
  88. if (this.mongoClient) await this.mongoClient.close(false);
  89. }
  90. /**
  91. * loadColllection - Import and load collection schema
  92. *
  93. * @param collectionName - Name of the collection
  94. * @returns Collection
  95. */
  96. private async loadCollection<T extends keyof Collections>(
  97. collectionName: T
  98. ) {
  99. const { default: schema }: { default: Schema } = await import(
  100. `../collections/${collectionName.toString()}`
  101. );
  102. return {
  103. schema,
  104. collection: this.mongoDb!.collection(collectionName.toString())
  105. };
  106. }
  107. /**
  108. * loadCollections - Load and initialize all collections
  109. *
  110. * @returns Promise
  111. */
  112. private async loadCollections() {
  113. this.collections = {
  114. abc: await this.loadCollection("abc"),
  115. station: await this.loadCollection("station")
  116. };
  117. }
  118. /**
  119. * Takes a raw projection and turns it into a projection we can easily use
  120. *
  121. * @param projection - The raw projection
  122. * @returns Normalized projection
  123. */
  124. private normalizeProjection(projection: Projection): NormalizedProjection {
  125. let initialProjection = projection;
  126. if (
  127. !(projection && typeof initialProjection === "object") &&
  128. !Array.isArray(initialProjection)
  129. )
  130. initialProjection = [];
  131. // Flatten the projection into a 2-dimensional array of key-value pairs
  132. let flattenedProjection = this.flattenProjection(initialProjection);
  133. // Make sure all values are booleans
  134. flattenedProjection = flattenedProjection.map(([key, value]) => {
  135. if (typeof value !== "boolean") return [key, !!value];
  136. return [key, value];
  137. });
  138. // Validate whether we have any 1:1 duplicate keys, and if we do, throw a path collision error
  139. const projectionKeys = flattenedProjection.map(([key]) => key);
  140. const uniqueProjectionKeys = new Set(projectionKeys);
  141. if (uniqueProjectionKeys.size !== flattenedProjection.length)
  142. throw new Error("Path collision, non-unique key");
  143. // Check for path collisions that are not the same, but for example for nested keys, like prop1.prop2 and prop1.prop2.prop3
  144. projectionKeys.forEach(key => {
  145. // Non-nested paths don't need to be checked, they're covered by the earlier path collision checking
  146. if (key.indexOf(".") !== -1) {
  147. // Recursively check for each layer of a key whether that key exists already, and if it does, throw a path collision error
  148. const recursivelyCheckForPathCollision = (
  149. keyToCheck: string
  150. ) => {
  151. // Remove the last ".prop" from the key we want to check, to check if that has any collisions
  152. const subKey = keyToCheck.substring(
  153. 0,
  154. keyToCheck.lastIndexOf(".")
  155. );
  156. if (projectionKeys.indexOf(subKey) !== -1)
  157. throw new Error(
  158. `Path collision! ${key} collides with ${subKey}`
  159. );
  160. // The sub key has another layer or more, so check that layer for path collisions too
  161. if (subKey.indexOf(".") !== -1)
  162. recursivelyCheckForPathCollision(subKey);
  163. };
  164. recursivelyCheckForPathCollision(key);
  165. }
  166. });
  167. // Check if we explicitly allow anything (with the exception of _id)
  168. const anyNonIdTrues = flattenedProjection.reduce(
  169. (anyTrues, [key, value]) => anyTrues || (value && key !== "_id"),
  170. false
  171. );
  172. // By default, include everything except keys whose value is false
  173. let mode: "includeAllBut" | "excludeAllBut" = "includeAllBut";
  174. // If in the projection we have any keys whose value is true (with the exception of _id), switch to excluding all but keys we explicitly set to true in the projection
  175. if (anyNonIdTrues) mode = "excludeAllBut";
  176. return { projection: flattenedProjection, mode };
  177. }
  178. /**
  179. * Flatten the projection we've given (which can be an array of an object) into an array with key/value pairs
  180. *
  181. * @param projection - Projection
  182. * @returns
  183. */
  184. private flattenProjection(projection: Projection): [string, boolean][] {
  185. let flattenedProjection: [
  186. string,
  187. boolean | string[] | ProjectionObject
  188. ][] = [];
  189. if (!projection) throw new Error("Projection can't be null");
  190. // Turn object/array into a key/value array
  191. if (Array.isArray(projection))
  192. flattenedProjection = projection.map(key => [key, true]);
  193. else if (typeof projection === "object")
  194. flattenedProjection = Object.entries(projection);
  195. // Go through our projection array, and recursively check if there is another layer we need to flatten
  196. const newProjection: [string, boolean][] = flattenedProjection.reduce(
  197. (currentEntries: [string, boolean][], [key, value]) => {
  198. if (typeof value === "object") {
  199. let flattenedValue = this.flattenProjection(value);
  200. flattenedValue = flattenedValue.map(
  201. ([nextKey, nextValue]) => [
  202. `${key}.${nextKey}`,
  203. nextValue
  204. ]
  205. );
  206. return [...currentEntries, ...flattenedValue];
  207. }
  208. return [...currentEntries, [key, value]];
  209. },
  210. []
  211. );
  212. return newProjection;
  213. }
  214. /**
  215. * Parse a projection based on the schema and any given projection
  216. * If no projection is given, it will exclude any restricted properties
  217. * If a projection is given, it will exclude restricted properties that are not explicitly allowed in a projection
  218. * It will return a projection used in Mongo, and if any restricted property is explicitly allowed, return that we can't use the cache
  219. *
  220. * @param schema - The schema object
  221. * @param projection - The project, which can be null
  222. * @returns
  223. */
  224. private async parseFindProjection(
  225. projection: NormalizedProjection,
  226. schema: SchemaDocument,
  227. allowedRestricted: AllowedRestricted
  228. ) {
  229. // The mongo projection object we're going to build
  230. const mongoProjection: ProjectionObject = {};
  231. // This will be false if we let Mongo return any restricted properties
  232. let canCache = true;
  233. const unfilteredEntries = Object.entries(schema);
  234. await Promise.all(
  235. unfilteredEntries.map(async ([key, value]) => {
  236. const { restricted } = value;
  237. // Check if the current property is allowed or not based on allowedRestricted
  238. const allowedByRestricted =
  239. !restricted ||
  240. this.allowedByRestricted(allowedRestricted, key);
  241. // If the property is explicitly allowed in the projection, but also restricted, find can't use cache
  242. if (allowedByRestricted && restricted) {
  243. canCache = false;
  244. }
  245. // If the property is restricted, but not explicitly allowed, make sure to have mongo exclude it. As it's excluded from Mongo, caching isn't an issue for this property
  246. else if (!allowedByRestricted) {
  247. mongoProjection[key] = false;
  248. }
  249. // If the current property is a nested schema
  250. else if (value.type === Types.Schema) {
  251. // Get the projection for the next layer
  252. const deeperProjection = this.getDeeperProjection(
  253. projection,
  254. key
  255. );
  256. // Get the allowedRestricted for the next layer
  257. const deeperAllowedRestricted =
  258. this.getDeeperAllowedRestricted(allowedRestricted, key);
  259. if (!value.schema) throw new Error("Schema is not defined");
  260. // Parse projection for the current value, so one level deeper
  261. const parsedProjection = await this.parseFindProjection(
  262. deeperProjection,
  263. value.schema,
  264. deeperAllowedRestricted
  265. );
  266. // If the parsed projection mongo projection contains anything, update our own mongo projection
  267. if (
  268. Object.keys(parsedProjection.mongoProjection).length > 0
  269. )
  270. mongoProjection[key] = parsedProjection.mongoProjection;
  271. // If the parsed projection says we can't use the cache, make sure we can't use cache either
  272. canCache = canCache && parsedProjection.canCache;
  273. }
  274. })
  275. );
  276. return {
  277. canCache,
  278. mongoProjection
  279. };
  280. }
  281. /**
  282. * Whether a property is allowed if it's restricted
  283. *
  284. * @param projection - The projection object/array
  285. * @param property - Property name
  286. * @returns
  287. */
  288. private allowedByRestricted(
  289. allowedRestricted: AllowedRestricted,
  290. property: string
  291. ) {
  292. // All restricted properties are allowed, so allow
  293. if (allowedRestricted === true) return true;
  294. // No restricted properties are allowed, so don't allow
  295. if (!allowedRestricted) return false;
  296. // allowedRestricted is not valid, so don't allow
  297. if (!Array.isArray(allowedRestricted)) return false;
  298. // This exact property is allowed, so allow
  299. if (allowedRestricted.indexOf(property) !== -1) return true;
  300. // Don't allow by default
  301. return false;
  302. }
  303. /**
  304. * Whether a property is allowed in a projection array/object
  305. *
  306. * @param projection - The projection object/array
  307. * @param property - Property name
  308. * @returns
  309. */
  310. private allowedByProjection(
  311. projection: NormalizedProjection,
  312. property: string
  313. ) {
  314. const obj = Object.fromEntries(projection.projection);
  315. if (projection.mode === "excludeAllBut") {
  316. // Only allow if explicitly allowed
  317. if (obj[property]) return true;
  318. // If this is a nested property that has any allowed properties at some lower level, allow at this level
  319. const nestedTrue = projection.projection.reduce(
  320. (nestedTrue, [key, value]) => {
  321. if (value && key.startsWith(`${property}.`)) return true;
  322. return nestedTrue;
  323. },
  324. false
  325. );
  326. return nestedTrue;
  327. }
  328. if (projection.mode === "includeAllBut") {
  329. // Explicitly excluded, so don't allow
  330. if (obj[property] === false) return false;
  331. // Not explicitly excluded, so allow this level
  332. return true;
  333. }
  334. // This should never happen
  335. return false;
  336. }
  337. /**
  338. * Returns the projection array/object that is one level deeper based on the property key
  339. *
  340. * @param projection - The projection object/array
  341. * @param key - The property key
  342. * @returns Array or Object
  343. */
  344. private getDeeperProjection(
  345. projection: NormalizedProjection,
  346. currentKey: string
  347. ): NormalizedProjection {
  348. const newProjection: [string, boolean][] = projection.projection
  349. // Go through all key/values
  350. .map(([key, value]) => {
  351. // If a key has no ".", it has no deeper level, so return false
  352. // If a key doesn't start with the provided currentKey, it's useless to us, so return false
  353. if (
  354. key.indexOf(".") === -1 ||
  355. !key.startsWith(`${currentKey}.`)
  356. )
  357. return false;
  358. // Get the lower key, so everything after "."
  359. const lowerKey = key.substring(
  360. key.indexOf(".") + 1,
  361. key.length
  362. );
  363. // If the lower key is empty for some reason, return false, but this should never happen
  364. if (lowerKey.length === 0) return false;
  365. return [lowerKey, value];
  366. })
  367. // Filter out any false's, so only key/value pairs remain
  368. // .filter<[string, boolean]>(entries => !!entries);
  369. .filter((entries): entries is [string, boolean] => !!entries);
  370. // Return the new projection with the projection array, and the same existing mode for the projection
  371. return { projection: newProjection, mode: projection.mode };
  372. }
  373. /**
  374. * Returns the allowedRestricted that is one level deeper based on the property key
  375. *
  376. * @param projection - The projection object/array
  377. * @param key - The property key
  378. * @returns Array or Object
  379. */
  380. private getDeeperAllowedRestricted(
  381. allowedRestricted: AllowedRestricted,
  382. currentKey: string
  383. ): AllowedRestricted {
  384. //
  385. if (typeof allowedRestricted === "boolean") return allowedRestricted;
  386. if (!Array.isArray(allowedRestricted)) return false;
  387. const newAllowedRestricted: string[] = <string[]>allowedRestricted
  388. // Go through all key/values
  389. .map(key => {
  390. // If a key has no ".", it has no deeper level, so return false
  391. // If a key doesn't start with the provided currentKey, it's useless to us, so return false
  392. if (
  393. key.indexOf(".") === -1 ||
  394. !key.startsWith(`${currentKey}.`)
  395. )
  396. return false;
  397. // Get the lower key, so everything after "."
  398. const lowerKey = key.substring(
  399. key.indexOf(".") + 1,
  400. key.length
  401. );
  402. // If the lower key is empty for some reason, return false, but this should never happen
  403. if (lowerKey.length === 0) return false;
  404. return lowerKey;
  405. })
  406. // Filter out any false's, so only keys remain
  407. .filter(entries => entries);
  408. // Return the new allowedRestricted
  409. return newAllowedRestricted;
  410. }
  411. private getCastedValue(value: unknown, schemaType: Types): AttributeValue {
  412. if (value === null || value === undefined) return null;
  413. if (schemaType === Types.String) {
  414. // Check if value is a string, and if not, convert the value to a string
  415. const castedValue =
  416. typeof value === "string" ? value : String(value);
  417. // Any additional validation comes here
  418. return castedValue;
  419. }
  420. if (schemaType === Types.Number) {
  421. // Check if value is a number, and if not, convert the value to a number
  422. const castedValue =
  423. typeof value === "number" ? value : Number(value);
  424. // We don't allow NaN for numbers, so throw an error
  425. if (Number.isNaN(castedValue))
  426. throw new Error(
  427. `Cast error, number cannot be NaN, with value ${value}`
  428. );
  429. // Any additional validation comes here
  430. return castedValue;
  431. }
  432. if (schemaType === Types.Date) {
  433. // Check if value is a Date, and if not, convert the value to a Date
  434. const castedValue =
  435. Object.prototype.toString.call(value) === "[object Date]"
  436. ? (value as Date)
  437. : new Date(value.toString());
  438. // We don't allow invalid dates, so throw an error
  439. if (new Date(castedValue).toString() === "Invalid Date")
  440. throw new Error(
  441. `Cast error, date cannot be invalid, with value ${value}`
  442. );
  443. // Any additional validation comes here
  444. return castedValue;
  445. }
  446. if (schemaType === Types.Boolean) {
  447. // Check if value is a boolean, and if not, convert the value to a boolean
  448. const castedValue =
  449. typeof value === "boolean" ? value : Boolean(value);
  450. // Any additional validation comes here
  451. return castedValue;
  452. }
  453. if (schemaType === Types.ObjectId) {
  454. if (typeof value !== "string" && !(value instanceof ObjectId))
  455. throw new Error(
  456. `Cast error, ObjectId invalid, with value ${value}`
  457. );
  458. // Cast the value as an ObjectId and let Mongoose handle the rest
  459. const castedValue = new ObjectId(value);
  460. // Any additional validation comes here
  461. return castedValue;
  462. }
  463. throw new Error(
  464. `Unsupported schema type found with type ${Types[schemaType]}. This should never happen.`
  465. );
  466. }
  467. /**
  468. * parseFindFilter - Ensure validity of filter and return a mongo filter
  469. *
  470. * @param filter - Filter
  471. * @param schema - Schema of collection document
  472. * @param options - Parser options
  473. * @returns Promise returning object with query values cast to schema types
  474. * and whether query includes restricted attributes
  475. */
  476. private async parseFindFilter(
  477. filter: MongoFilter,
  478. schema: SchemaDocument,
  479. allowedRestricted: AllowedRestricted,
  480. options?: {
  481. operators?: boolean;
  482. }
  483. ): Promise<{
  484. mongoFilter: MongoFilter;
  485. containsRestrictedProperties: boolean;
  486. canCache: boolean;
  487. }> {
  488. if (!filter || typeof filter !== "object")
  489. throw new Error(
  490. "Invalid filter provided. Filter must be an object."
  491. );
  492. const keys = Object.keys(filter);
  493. if (keys.length === 0)
  494. throw new Error(
  495. "Invalid filter provided. Filter must contain keys."
  496. );
  497. // Whether to parse operators or not
  498. const operators = !(options && options.operators === false);
  499. // The MongoDB filter we're building
  500. const mongoFilter: MongoFilter = {};
  501. // If the filter references any properties that are restricted, this will be true, so that find knows not to cache the query object
  502. let containsRestrictedProperties = false;
  503. // Whether this filter is cachable or not
  504. let canCache = true;
  505. // Operators at the key level that we support right now
  506. const allowedKeyOperators = ["$or", "$and"];
  507. // Operators at the value level that we support right now
  508. const allowedValueOperators = ["$in"];
  509. // Loop through all key/value properties
  510. await Promise.all(
  511. Object.entries(filter).map(async ([key, value]) => {
  512. // Key must be 1 character and exist
  513. if (!key || key.length === 0)
  514. throw new Error(
  515. `Invalid filter provided. Key must be at least 1 character.`
  516. );
  517. // Handle key operators, which always start with a $
  518. if (operators && key[0] === "$") {
  519. // Operator isn't found, so throw an error
  520. if (allowedKeyOperators.indexOf(key) === -1)
  521. throw new Error(
  522. `Invalid filter provided. Operator "${key}" is not allowed.`
  523. );
  524. // We currently only support $or and $and, but here we can have different logic for different operators
  525. if (key === "$or" || key === "$and") {
  526. // $or and $and should always be an array, so check if it is
  527. if (!Array.isArray(value) || value.length === 0)
  528. throw new Error(
  529. `Key "${key}" must contain array of filters.`
  530. );
  531. // Add the operator to the mongo filter object as an empty array
  532. mongoFilter[key] = [];
  533. // Run parseFindQuery again for child objects and add them to the mongo filter operator array
  534. await Promise.all(
  535. value.map(async _value => {
  536. // Value must be an actual object, so if it's not, throw an error
  537. if (
  538. !_value ||
  539. typeof _value !== "object" ||
  540. _value.constructor.name !== "Object"
  541. )
  542. throw Error("not an object");
  543. const {
  544. mongoFilter: _mongoFilter,
  545. containsRestrictedProperties:
  546. _containsRestrictedProperties
  547. } = await this.parseFindFilter(
  548. _value as MongoFilter,
  549. schema,
  550. allowedRestricted,
  551. options
  552. );
  553. // Actually add the returned filter object to the mongo filter we're building
  554. (<MongoFilter[]>mongoFilter[key]).push(
  555. _mongoFilter
  556. );
  557. if (_containsRestrictedProperties)
  558. containsRestrictedProperties = true;
  559. })
  560. );
  561. } else
  562. throw new Error(
  563. `Unhandled operator "${key}", this should never happen!`
  564. );
  565. } else {
  566. // Here we handle any normal keys in the query object
  567. let currentKey = key;
  568. // If the key doesn't exist in the schema, throw an error
  569. if (!Object.hasOwn(schema, key)) {
  570. if (key.indexOf(".") !== -1) {
  571. currentKey = key.substring(0, key.indexOf("."));
  572. if (!Object.hasOwn(schema, currentKey))
  573. throw new Error(
  574. `Key "${currentKey}" does not exist in the schema.`
  575. );
  576. if (
  577. schema[currentKey].type !== Types.Schema &&
  578. (schema[currentKey].type !== Types.Array ||
  579. (schema[currentKey].item!.type !==
  580. Types.Schema &&
  581. schema[currentKey].item!.type !==
  582. Types.Array))
  583. )
  584. throw new Error(
  585. `Key "${currentKey}" is not a schema/array.`
  586. );
  587. } else
  588. throw new Error(
  589. `Key "${key}" does not exist in the schema.`
  590. );
  591. }
  592. const { restricted } = schema[currentKey];
  593. // Check if the current property is allowed or not based on allowedRestricted
  594. const allowedByRestricted =
  595. !restricted ||
  596. this.allowedByRestricted(allowedRestricted, currentKey);
  597. if (!allowedByRestricted)
  598. throw new Error(`Key "${currentKey}" is restricted.`);
  599. // If the key in the schema is marked as restricted, containsRestrictedProperties will be true
  600. if (restricted) containsRestrictedProperties = true;
  601. // Handle value operators
  602. if (
  603. operators &&
  604. typeof value === "object" &&
  605. value &&
  606. Object.keys(value).length === 1 &&
  607. Object.keys(value)[0] &&
  608. Object.keys(value)[0][0] === "$"
  609. ) {
  610. // This entire if statement is for handling value operators like $in
  611. const operator = Object.keys(value)[0];
  612. // Operator isn't found, so throw an error
  613. if (allowedValueOperators.indexOf(operator) === -1)
  614. throw new Error(
  615. `Invalid filter provided. Operator "${operator}" is not allowed.`
  616. );
  617. // Handle the $in value operator
  618. if (operator === "$in") {
  619. // Decide what type should be for the values for $in
  620. let { type } = schema[currentKey];
  621. // We don't allow schema type for $in
  622. if (type === Types.Schema)
  623. throw new Error(
  624. `Key "${currentKey}" is of type schema, which is not allowed with $in`
  625. );
  626. // Set the type to be the array item type if it's about an array
  627. if (type === Types.Array)
  628. type = schema[key].item!.type;
  629. const value$in = (<{ $in: AttributeValue[] }>value)
  630. .$in;
  631. let filter$in: AttributeValue[] = [];
  632. if (!Array.isArray(value$in))
  633. throw new Error("$in musr be array");
  634. // Loop through all $in array items, check if they're not null/undefined, cast them, and return a new array
  635. if (value$in.length > 0)
  636. filter$in = await Promise.all(
  637. value$in.map(async _value => {
  638. const isNullOrUndefined =
  639. _value === null ||
  640. _value === undefined;
  641. if (isNullOrUndefined)
  642. throw new Error(
  643. `Value for key ${currentKey} using $in is undefuned/null, which is not allowed.`
  644. );
  645. const castedValue = this.getCastedValue(
  646. _value,
  647. type
  648. );
  649. return castedValue;
  650. })
  651. );
  652. mongoFilter[currentKey] = { $in: filter$in };
  653. } else
  654. throw new Error(
  655. `Unhandled operator "${operator}", this should never happen!`
  656. );
  657. }
  658. // Handle schema type
  659. else if (schema[currentKey].type === Types.Schema) {
  660. let subFilter;
  661. if (key.indexOf(".") !== -1) {
  662. const subKey = key.substring(
  663. key.indexOf(".") + 1,
  664. key.length
  665. );
  666. subFilter = {
  667. [subKey]: value
  668. };
  669. } else subFilter = value;
  670. // Sub-filter must be an actual object, so if it's not, throw an error
  671. if (
  672. !subFilter ||
  673. typeof subFilter !== "object" ||
  674. subFilter.constructor.name !== "Object"
  675. )
  676. throw Error("not an object");
  677. // Get the allowedRestricted for the next layer
  678. const deeperAllowedRestricted =
  679. this.getDeeperAllowedRestricted(
  680. allowedRestricted,
  681. currentKey
  682. );
  683. // Run parseFindFilter on the nested schema object
  684. const {
  685. mongoFilter: _mongoFilter,
  686. containsRestrictedProperties:
  687. _containsRestrictedProperties
  688. } = await this.parseFindFilter(
  689. subFilter as MongoFilter,
  690. schema[currentKey].schema!,
  691. deeperAllowedRestricted,
  692. options
  693. );
  694. mongoFilter[currentKey] = _mongoFilter;
  695. if (_containsRestrictedProperties)
  696. containsRestrictedProperties = true;
  697. }
  698. // Handle array type
  699. else if (schema[currentKey].type === Types.Array) {
  700. const isNullOrUndefined =
  701. value === null || value === undefined;
  702. if (isNullOrUndefined)
  703. throw new Error(
  704. `Value for key ${currentKey} is an array item, so it cannot be null/undefined.`
  705. );
  706. // The type of the array items
  707. const itemType = schema[currentKey].item!.type;
  708. // Handle nested arrays, which are not supported
  709. if (itemType === Types.Array)
  710. throw new Error("Nested arrays not supported");
  711. // Handle schema array item type
  712. else if (itemType === Types.Schema) {
  713. let subFilter;
  714. if (key.indexOf(".") !== -1) {
  715. const subKey = key.substring(
  716. key.indexOf(".") + 1,
  717. key.length
  718. );
  719. subFilter = {
  720. [subKey]: value
  721. };
  722. } else subFilter = value;
  723. // Sub-filter must be an actual object, so if it's not, throw an error
  724. if (
  725. typeof subFilter !== "object" ||
  726. subFilter.constructor.name !== "Object"
  727. )
  728. throw Error("not an object");
  729. // Get the allowedRestricted for the next layer
  730. const deeperAllowedRestricted =
  731. this.getDeeperAllowedRestricted(
  732. allowedRestricted,
  733. currentKey
  734. );
  735. const {
  736. mongoFilter: _mongoFilter,
  737. containsRestrictedProperties:
  738. _containsRestrictedProperties
  739. } = await this.parseFindFilter(
  740. subFilter as MongoFilter,
  741. schema[currentKey].item!.schema!,
  742. deeperAllowedRestricted,
  743. options
  744. );
  745. mongoFilter[currentKey] = _mongoFilter;
  746. if (_containsRestrictedProperties)
  747. containsRestrictedProperties = true;
  748. }
  749. // Normal array item type
  750. else {
  751. // Value must not be an array, so if it is, throw an error
  752. if (Array.isArray(value)) throw Error("an array");
  753. // Value must not be an actual object, so if it is, throw an error
  754. if (
  755. typeof value === "object" &&
  756. value.constructor.name === "Object"
  757. )
  758. throw Error("an object");
  759. mongoFilter[currentKey] = this.getCastedValue(
  760. value as AttributeValue,
  761. itemType
  762. );
  763. }
  764. }
  765. // Handle normal types
  766. else {
  767. const isNullOrUndefined =
  768. value === null || value === undefined;
  769. if (isNullOrUndefined && schema[key].required)
  770. throw new Error(
  771. `Value for key ${key} is required, so it cannot be null/undefined.`
  772. );
  773. // If the value is null or undefined, just set it as null
  774. if (isNullOrUndefined) mongoFilter[key] = null;
  775. // Cast and validate values
  776. else {
  777. const schemaType = schema[key].type;
  778. // Value must not be an array, so if it is, throw an error
  779. if (Array.isArray(value)) throw Error("an array");
  780. // Value must not be an actual object, so if it is, throw an error
  781. if (
  782. typeof value === "object" &&
  783. value.constructor.name === "Object"
  784. )
  785. throw Error("an object");
  786. mongoFilter[key] = this.getCastedValue(
  787. value as AttributeValue,
  788. schemaType
  789. );
  790. }
  791. }
  792. }
  793. })
  794. );
  795. if (containsRestrictedProperties) canCache = false;
  796. return { mongoFilter, containsRestrictedProperties, canCache };
  797. }
  798. /**
  799. * Strip a document object from any unneeded properties, or of any restricted properties
  800. * If a projection is given
  801. * Also casts some values
  802. *
  803. * @param document - The document object
  804. * @param schema - The schema object
  805. * @param projection - The projection, which can be null
  806. */
  807. private async stripDocument(
  808. document: Document,
  809. schema: SchemaDocument,
  810. projection: NormalizedProjection,
  811. allowedRestricted: AllowedRestricted
  812. ): Promise<Document> {
  813. const unfilteredEntries = Object.entries(document);
  814. // Go through all properties in the document to decide whether to allow it or not, and possibly casts the value to its property type
  815. const filteredEntries: Entries<Document> = [];
  816. await Promise.all(
  817. unfilteredEntries.map(async ([key, value]) => {
  818. // If the property does not exist in the schema, return the memo, so we won't return the key/value in the stripped document
  819. if (!schema[key]) return;
  820. // If we have a projection, check if the current key is allowed by it. If it not, just return the memo
  821. const allowedByProjection = this.allowedByProjection(
  822. projection,
  823. key
  824. );
  825. const allowedByRestricted =
  826. !schema[key].restricted ||
  827. this.allowedByRestricted(allowedRestricted, key);
  828. if (!allowedByProjection) return;
  829. if (!allowedByRestricted) return;
  830. // Handle nested object
  831. if (schema[key].type === Types.Schema) {
  832. // If value is falsy, it can't be an object, so just return null
  833. if (!value) {
  834. filteredEntries.push([key, null]);
  835. return;
  836. }
  837. // Value must be an actual object, so if it's not, throw an error
  838. if (
  839. typeof value !== "object" ||
  840. value.constructor.name !== "Object"
  841. )
  842. throw Error("not an object");
  843. // Get the projection for the next layer
  844. const deeperProjection = this.getDeeperProjection(
  845. projection,
  846. key
  847. );
  848. // Get the allowedRestricted for the next layer
  849. const deeperAllowedRestricted =
  850. this.getDeeperAllowedRestricted(allowedRestricted, key);
  851. // Generate a stripped document/object for the current key/value
  852. const strippedDocument = await this.stripDocument(
  853. value as Document, // We can be sure the value is a document, so this is for TypeScript to be happy
  854. schema[key].schema!,
  855. deeperProjection,
  856. deeperAllowedRestricted
  857. );
  858. // If the returned stripped document/object has keys, add the current key with that document/object to the memo
  859. if (Object.keys(strippedDocument).length > 0) {
  860. filteredEntries.push([key, strippedDocument]);
  861. return;
  862. }
  863. // The current key has no values that should be returned, so just return empty object
  864. filteredEntries.push([key, {}]);
  865. return;
  866. }
  867. // Handle array type
  868. if (schema[key].type === Types.Array) {
  869. // If value is falsy, return null with the key instead
  870. if (!value) {
  871. filteredEntries.push([key, null]);
  872. return;
  873. }
  874. // If value isn't a valid array, throw error
  875. if (!Array.isArray(value)) throw Error("not an array");
  876. // The type of the array items
  877. const itemType = schema[key].item!.type;
  878. const items = (await Promise.all(
  879. value.map(async item => {
  880. // Handle schema objects inside an array
  881. if (itemType === Types.Schema) {
  882. // Item must be an actual object, so if it's not, throw an error
  883. if (
  884. !item ||
  885. typeof item !== "object" ||
  886. item.constructor.name !== "Object"
  887. )
  888. throw Error("not an object");
  889. // Get the projection for the next layer
  890. const deeperProjection =
  891. this.getDeeperProjection(projection, key);
  892. // Get the allowedRestricted for the next layer
  893. const deeperAllowedRestricted =
  894. this.getDeeperAllowedRestricted(
  895. allowedRestricted,
  896. key
  897. );
  898. // Generate a stripped document/object for the current key/value
  899. const strippedDocument =
  900. await this.stripDocument(
  901. item as Document, // We can be sure the item is a document, so this is for TypeScript to be happy
  902. schema[key].item!.schema!,
  903. deeperProjection,
  904. deeperAllowedRestricted
  905. );
  906. // If the returned stripped document/object has keys, return the stripped document
  907. if (Object.keys(strippedDocument).length > 0)
  908. return strippedDocument;
  909. // The current item has no values that should be returned, so just return empty object
  910. return {};
  911. }
  912. // Nested arrays are not supported
  913. if (itemType === Types.Array) {
  914. throw new Error("Nested arrays not supported");
  915. }
  916. // Handle normal types
  917. else {
  918. // If item is null or undefined, return null
  919. const isNullOrUndefined =
  920. item === null || item === undefined;
  921. if (isNullOrUndefined) return null;
  922. // Cast item
  923. const castedValue = this.getCastedValue(
  924. item,
  925. itemType
  926. );
  927. return castedValue;
  928. }
  929. })
  930. )) as AttributeValue[] | Document[];
  931. filteredEntries.push([key, items]);
  932. return;
  933. }
  934. // Handle normal types
  935. // Cast item
  936. const castedValue = this.getCastedValue(
  937. value,
  938. schema[key].type
  939. );
  940. filteredEntries.push([key, castedValue]);
  941. })
  942. );
  943. return Object.fromEntries(filteredEntries);
  944. }
  945. /**
  946. * find - Get one or more document(s) from a single collection
  947. *
  948. * @param payload - Payload
  949. * @returns Returned object
  950. */
  951. public async find<CollectionNameType extends keyof Collections>(
  952. context: JobContext,
  953. {
  954. collection, // Collection name
  955. filter, // Similar to MongoDB filter
  956. projection,
  957. allowedRestricted,
  958. limit = 1,
  959. page = 1,
  960. useCache = true
  961. }: {
  962. collection: CollectionNameType;
  963. filter: MongoFilter;
  964. projection?: Projection;
  965. allowedRestricted?: boolean | string[];
  966. limit?: number;
  967. page?: number;
  968. useCache?: boolean;
  969. }
  970. ) {
  971. // Verify page and limit parameters
  972. if (page < 1) throw new Error("Page must be at least 1");
  973. if (limit < 1) throw new Error("Limit must be at least 1");
  974. if (limit > 100) throw new Error("Limit must not be greater than 100");
  975. // Verify whether the collection exists, and get the schema
  976. if (!collection) throw new Error("No collection specified");
  977. if (this.collections && !this.collections[collection])
  978. throw new Error("Collection not found");
  979. const { schema } = this.collections![collection];
  980. // Normalize the projection into something we understand, and which throws an error if we have any path collisions
  981. const normalizedProjection = this.normalizeProjection(projection);
  982. // Parse the projection into a mongo projection, and returns whether this query can be cached or not
  983. const parsedProjection = await this.parseFindProjection(
  984. normalizedProjection,
  985. schema.getDocument(),
  986. allowedRestricted
  987. );
  988. let cacheable = useCache !== false && parsedProjection.canCache;
  989. const { mongoProjection } = parsedProjection;
  990. // Parse the filter into a mongo filter, which also validates whether the filter is legal or not, and returns whether this query can be cached or not
  991. const parsedFilter = await this.parseFindFilter(
  992. filter,
  993. schema.getDocument(),
  994. allowedRestricted
  995. );
  996. cacheable = cacheable && parsedFilter.canCache;
  997. const { mongoFilter } = parsedFilter;
  998. let queryHash: string | null = null;
  999. let documents: Document[] | null = null;
  1000. // If we can use cache, get from the cache, and if we get results return those
  1001. // If we're allowed to cache, and the filter doesn't reference any restricted fields, try to cache the query and its response
  1002. if (cacheable) {
  1003. // Turn the query object into a md5 hash that can be used as a Redis key
  1004. queryHash = createHash("md5")
  1005. .update(
  1006. JSON.stringify({
  1007. collection,
  1008. mongoFilter,
  1009. limit,
  1010. page
  1011. })
  1012. )
  1013. .digest("hex");
  1014. // Check if the query hash already exists in Redis, and get it if it is
  1015. const cachedQuery = await this.redisClient?.GET(
  1016. `query.find.${queryHash}`
  1017. );
  1018. // Return the mongoFilter along with the cachedDocuments, if any
  1019. documents = cachedQuery ? JSON.parse(cachedQuery) : null;
  1020. }
  1021. // We got cached documents, so continue with those
  1022. if (documents) {
  1023. cacheable = false;
  1024. } else {
  1025. const totalCount = await this.collections?.[
  1026. collection
  1027. ].collection.countDocuments(mongoFilter);
  1028. if (totalCount === 0 || totalCount === undefined)
  1029. return limit === 1 ? null : [];
  1030. const lastPage = Math.ceil(totalCount / limit);
  1031. if (lastPage < page)
  1032. throw new Error(`The last page available is ${lastPage}`);
  1033. // Create the Mongo cursor and then return the promise that gets the array of documents
  1034. documents = (await this.collections?.[collection].collection
  1035. .find(mongoFilter, mongoProjection)
  1036. .limit(limit)
  1037. .skip((page - 1) * limit)
  1038. .toArray()) as Document[];
  1039. }
  1040. // Adds query results to cache but doesnt await
  1041. if (cacheable && queryHash) {
  1042. this.redisClient!.SET(
  1043. `query.find.${queryHash}`,
  1044. JSON.stringify(documents),
  1045. {
  1046. EX: 60
  1047. }
  1048. );
  1049. }
  1050. // Strips the document of any unneeded properties or properties that are restricted
  1051. documents = await Promise.all(
  1052. documents.map(async (document: Document) =>
  1053. this.stripDocument(
  1054. document,
  1055. schema.getDocument(),
  1056. normalizedProjection,
  1057. allowedRestricted
  1058. )
  1059. )
  1060. );
  1061. if (!documents || documents!.length === 0)
  1062. return limit === 1 ? null : [];
  1063. return limit === 1 ? documents![0] : documents;
  1064. }
  1065. }
  1066. export type DataModuleJobs = {
  1067. [Property in keyof UniqueMethods<DataModule>]: {
  1068. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  1069. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  1070. };
  1071. };