DataModule.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. import async from "async";
  2. import config from "config";
  3. import mongoose, { Schema } from "mongoose";
  4. import hash from "object-hash";
  5. import { createClient, RedisClientType } from "redis";
  6. import JobContext from "src/JobContext";
  7. import BaseModule from "../BaseModule";
  8. import ModuleManager from "../ModuleManager";
  9. import { UniqueMethods } from "../types/Modules";
  10. import { Collections } from "../types/Collections";
  11. export default class DataModule extends BaseModule {
  12. collections?: Collections;
  13. redis?: RedisClientType;
  14. /**
  15. * Data Module
  16. *
  17. * @param moduleManager - Module manager class
  18. */
  19. public constructor(moduleManager: ModuleManager) {
  20. super(moduleManager, "data");
  21. }
  22. /**
  23. * startup - Startup data module
  24. */
  25. public override startup(): Promise<void> {
  26. return new Promise((resolve, reject) => {
  27. async.waterfall(
  28. [
  29. async () => super.startup(),
  30. async () => {
  31. const mongoUrl = config.get<string>("mongo.url");
  32. return mongoose.connect(mongoUrl);
  33. },
  34. async () => this.loadCollections(),
  35. async () => {
  36. if (this.collections) {
  37. await async.each(
  38. Object.values(this.collections),
  39. async collection =>
  40. collection.model.syncIndexes()
  41. );
  42. } else
  43. throw new Error("Collections have not been loaded");
  44. },
  45. async () => {
  46. const { url, password } = config.get<{
  47. url: string;
  48. password: string;
  49. }>("redis");
  50. this.redis = createClient({
  51. url,
  52. password
  53. });
  54. return this.redis.connect();
  55. },
  56. async () => {
  57. if (!this.redis)
  58. throw new Error("Redis connection not established");
  59. return this.redis.sendCommand([
  60. "CONFIG",
  61. "GET",
  62. "notify-keyspace-events"
  63. ]);
  64. },
  65. async (redisConfigResponse: string[]) => {
  66. if (
  67. !(
  68. Array.isArray(redisConfigResponse) &&
  69. redisConfigResponse[1] === "xE"
  70. )
  71. )
  72. throw new Error(
  73. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  74. (Array.isArray(redisConfigResponse) &&
  75. redisConfigResponse[1]) ||
  76. "unknown"
  77. }`
  78. );
  79. },
  80. async () => super.started()
  81. ],
  82. err => {
  83. if (err) reject(err);
  84. else resolve();
  85. }
  86. );
  87. });
  88. }
  89. /**
  90. * shutdown - Shutdown data module
  91. */
  92. public override shutdown(): Promise<void> {
  93. return new Promise(resolve => {
  94. super
  95. .shutdown()
  96. .then(async () => {
  97. // TODO: Ensure the following shutdown correctly
  98. if (this.redis) await this.redis.quit();
  99. await mongoose.connection.close(false);
  100. })
  101. .finally(() => resolve());
  102. });
  103. }
  104. /**
  105. * loadColllection - Import and load collection schema
  106. *
  107. * @param collectionName - Name of the collection
  108. * @returns Collection
  109. */
  110. private loadCollection<T extends keyof Collections>(
  111. collectionName: T
  112. ): Promise<Collections[T]> {
  113. return new Promise(resolve => {
  114. import(`../collections/${collectionName.toString()}`).then(
  115. ({ schema }: { schema: Collections[T]["schema"] }) => {
  116. const mongoSchema = new Schema<
  117. Collections[T]["schema"]["document"]
  118. >(schema.document, {
  119. timestamps: schema.timestamps
  120. });
  121. const model = mongoose.model(
  122. collectionName.toString(),
  123. mongoSchema
  124. );
  125. // @ts-ignore
  126. resolve({
  127. // @ts-ignore
  128. schema,
  129. // @ts-ignore
  130. model
  131. });
  132. }
  133. );
  134. });
  135. }
  136. /**
  137. * loadCollections - Load and initialize all collections
  138. *
  139. * @returns Promise
  140. */
  141. private loadCollections(): Promise<void> {
  142. return new Promise((resolve, reject) => {
  143. const fetchCollections = async () => ({
  144. abc: await this.loadCollection("abc")
  145. });
  146. fetchCollections()
  147. .then(collections => {
  148. this.collections = collections;
  149. resolve();
  150. })
  151. .catch(err => {
  152. reject(new Error(err));
  153. });
  154. });
  155. }
  156. // TODO split core into parseDocument(document, schema, { partial: boolean; })
  157. /**
  158. * parseQuery - Ensure validity of query and return a mongo query, or the document itself re-constructed
  159. *
  160. * @param query - Query
  161. * @param schema - Schema of collection document
  162. * @param options - Parser options
  163. * @returns Promise returning object with query values cast to schema types
  164. * and whether query includes restricted attributes
  165. */
  166. private async parseQuery(
  167. query: any,
  168. schema: any,
  169. options?: {
  170. operators?: boolean;
  171. }
  172. ): Promise<{ castQuery: any; restricted: boolean }> {
  173. if (!query || typeof query !== "object")
  174. throw new Error("Invalid query provided. Query must be an object.");
  175. const keys = Object.keys(query);
  176. if (keys.length === 0)
  177. throw new Error("Invalid query provided. Query must contain keys.");
  178. // Whether to parse operators or not
  179. const operators = !(options && options.operators === false);
  180. // The MongoDB query we're building
  181. const castQuery: any = {};
  182. // If the query references any fields that are restricted, this will be true, so that find knows not to cache the query object
  183. let restricted = false;
  184. // Operators at the key level that we support right now
  185. const allowedKeyOperators = ["$or", "$and"];
  186. // Operators at the value level that we support right now
  187. const allowedValueOperators = ["$in"];
  188. await async.each(Object.entries(query), async ([key, value]) => {
  189. // Key must be 1 character and exist
  190. if (!key || key.length === 0)
  191. throw new Error(
  192. `Invalid query provided. Key must be at least 1 character.`
  193. );
  194. // Handle key operators, which always start with a $
  195. if (operators && key[0] === "$") {
  196. // Operator isn't found, so throw an error
  197. if (allowedKeyOperators.indexOf(key) === -1)
  198. throw new Error(
  199. `Invalid query provided. Operator "${key}" is not allowed.`
  200. );
  201. // We currently only support $or and $and, but here we can have different logic for different operators
  202. if (key === "$or" || key === "$and") {
  203. // $or and $and should always be an array, so check if it is
  204. if (!Array.isArray(value) || value.length === 0)
  205. throw new Error(
  206. `Key "${key}" must contain array of queries.`
  207. );
  208. // Add the operator to the mongo query object as an empty array
  209. castQuery[key] = [];
  210. // Run parseQuery again for child objects and add them to the mongo query operator array
  211. await async.each(value, async _value => {
  212. const {
  213. castQuery: _castQuery,
  214. restricted: _restricted
  215. } = await this.parseQuery(_value, schema, options);
  216. // Actually add the returned query object to the mongo query we're building
  217. castQuery[key].push(_castQuery);
  218. if (_restricted) restricted = true;
  219. });
  220. } else
  221. throw new Error(
  222. `Unhandled operator "${key}", this should never happen!`
  223. );
  224. } else {
  225. // Here we handle any normal keys in the query object
  226. // If the key doesn't exist in the schema, throw an error
  227. if (!Object.hasOwn(schema, key))
  228. throw new Error(
  229. `Key "${key} does not exist in the schema."`
  230. );
  231. // If the key in the schema is marked as restricted, mark the entire query as restricted
  232. if (schema[key].restricted) restricted = true;
  233. // Type will be undefined if it's a nested object
  234. if (schema[key].type === undefined) {
  235. // Run parseQuery on the nested schema object
  236. const { castQuery: _castQuery, restricted: _restricted } =
  237. await this.parseQuery(value, schema[key], options);
  238. castQuery[key] = _castQuery;
  239. if (_restricted) restricted = true;
  240. } else if (
  241. operators &&
  242. typeof value === "object" &&
  243. value &&
  244. Object.keys(value).length === 1 &&
  245. Object.keys(value)[0] &&
  246. Object.keys(value)[0][0] === "$"
  247. ) {
  248. // This entire if statement is for handling value operators
  249. // Operator isn't found, so throw an error
  250. if (allowedValueOperators.indexOf(key) === -1)
  251. throw new Error(
  252. `Invalid query provided. Operator "${key}" is not allowed.`
  253. );
  254. // Handle the $in value operator
  255. if (value.$in) {
  256. castQuery[key] = {
  257. $in: []
  258. };
  259. if (value.$in.length > 0)
  260. castQuery[key].$in = await async.map(
  261. value.$in,
  262. async (_value: any) => {
  263. if (
  264. typeof schema[key].type === "function"
  265. ) {
  266. const Type = schema[key].type;
  267. const castValue = new Type(_value);
  268. if (schema[key].validate)
  269. await schema[key]
  270. .validate(castValue)
  271. .catch(err => {
  272. throw new Error(
  273. `Invalid value for ${key}, ${err}`
  274. );
  275. });
  276. return castValue;
  277. }
  278. throw new Error(
  279. `Invalid schema type for ${key}`
  280. );
  281. }
  282. );
  283. } else
  284. throw new Error(
  285. `Unhandled operator "${
  286. Object.keys(value)[0]
  287. }", this should never happen!`
  288. );
  289. } else if (typeof schema[key].type === "function") {
  290. const Type = schema[key].type;
  291. const castValue = new Type(value);
  292. if (schema[key].validate)
  293. await schema[key].validate(castValue).catch(err => {
  294. throw new Error(`Invalid value for ${key}, ${err}`);
  295. });
  296. castQuery[key] = castValue;
  297. } else throw new Error(`Invalid schema type for ${key}`);
  298. }
  299. });
  300. return { castQuery, restricted };
  301. }
  302. // TODO hide sensitive fields
  303. // TODO improve caching
  304. // TODO add option to only request certain fields
  305. // TODO add support for computed fields
  306. // TODO parse query - validation
  307. // TODO add proper typescript support
  308. // TODO add proper jsdoc
  309. // TODO add support for enum document attributes
  310. // TODO add support for array document attributes
  311. // TODO add support for reference document attributes
  312. // TODO prevent caching if requiring restricted values
  313. // TODO fix 2nd layer of schema
  314. /**
  315. * find - Get one or more document(s) from a single collection
  316. *
  317. * @param payload - Payload
  318. * @returns Returned object
  319. */
  320. public find<T extends keyof Collections>(
  321. context: JobContext,
  322. {
  323. collection, // Collection name
  324. query, // Similar to MongoDB query
  325. values, // TODO: Add support
  326. limit = 1, // TODO have limit off by default?
  327. page = 1,
  328. useCache = true
  329. }: {
  330. collection: T;
  331. query: Record<string, any>;
  332. values?: Record<string, any>;
  333. limit?: number;
  334. page?: number;
  335. useCache?: boolean;
  336. }
  337. ): Promise<any | null> {
  338. return new Promise((resolve, reject) => {
  339. let queryHash: string | null = null;
  340. let cacheable = useCache !== false;
  341. async.waterfall(
  342. [
  343. // Verify whether the collection exists
  344. async () => {
  345. if (!collection)
  346. throw new Error("No collection specified");
  347. if (this.collections && !this.collections[collection])
  348. throw new Error("Collection not found");
  349. },
  350. // Verify whether the query is valid-enough to continue
  351. async () =>
  352. this.parseQuery(
  353. query,
  354. this.collections![collection].schema.document
  355. ),
  356. // If we can use cache, get from the cache, and if we get results return those
  357. async ({ castQuery, restricted }: any) => {
  358. // If we're allowed to cache, and the query doesn't reference any restricted fields, try to cache the query and its response
  359. if (cacheable && !restricted) {
  360. // Turn the query object into a sha1 hash that can be used as a Redis key
  361. queryHash = hash(
  362. { collection, castQuery, values, limit, page },
  363. {
  364. algorithm: "sha1"
  365. }
  366. );
  367. // Check if the query hash already exists in Redis, and get it if it is
  368. const cachedQuery = await this.redis?.GET(
  369. `query.find.${queryHash}`
  370. );
  371. // Return the castQuery along with the cachedDocuments, if any
  372. return {
  373. castQuery,
  374. cachedDocuments: cachedQuery
  375. ? JSON.parse(cachedQuery)
  376. : null
  377. };
  378. }
  379. return { castQuery, cachedDocuments: null };
  380. },
  381. // If we didn't get documents from the cache, get them from mongo
  382. async ({ castQuery, cachedDocuments }: any) => {
  383. if (cachedDocuments) {
  384. cacheable = false;
  385. return cachedDocuments;
  386. }
  387. const getFindValues = async (object: any) => {
  388. const find: any = {};
  389. await async.each(
  390. Object.entries(object),
  391. async ([key, value]) => {
  392. if (
  393. value.type === undefined &&
  394. Object.keys(value).length > 0
  395. ) {
  396. const _find = await getFindValues(
  397. value
  398. );
  399. if (Object.keys(_find).length > 0)
  400. find[key] = _find;
  401. } else if (!value.restricted)
  402. find[key] = true;
  403. }
  404. );
  405. return find;
  406. };
  407. const find: any = await getFindValues(
  408. this.collections![collection].schema.document
  409. );
  410. return this.collections?.[collection].model
  411. .find(castQuery, find)
  412. .limit(limit)
  413. .skip((page - 1) * limit);
  414. },
  415. // Convert documents from Mongoose model to regular objects
  416. async (documents: any[]) =>
  417. async.map(documents, async (document: any) => {
  418. const { castQuery } = await this.parseQuery(
  419. document._doc || document,
  420. this.collections![collection].schema.document,
  421. { operators: false }
  422. );
  423. return castQuery;
  424. }),
  425. // Add documents to the cache
  426. async (documents: any[]) => {
  427. // Adds query results to cache but doesnt await
  428. if (cacheable && queryHash) {
  429. this.redis!.SET(
  430. `query.find.${queryHash}`,
  431. JSON.stringify(documents),
  432. {
  433. EX: 60
  434. }
  435. );
  436. }
  437. return documents;
  438. }
  439. ],
  440. (err, documents?: any[]) => {
  441. if (err) reject(err);
  442. else if (!documents || documents!.length === 0)
  443. resolve(limit === 1 ? null : []);
  444. else resolve(limit === 1 ? documents![0] : documents);
  445. }
  446. );
  447. });
  448. }
  449. }
  450. export type DataModuleJobs = {
  451. [Property in keyof UniqueMethods<DataModule>]: {
  452. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  453. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  454. };
  455. };