|
@@ -28,80 +28,67 @@ export default class DataModule extends BaseModule {
|
|
|
return new Promise((resolve, reject) => {
|
|
|
async.waterfall(
|
|
|
[
|
|
|
- (next: any) => {
|
|
|
- super
|
|
|
- .startup()
|
|
|
- .then(() => next())
|
|
|
- .catch(next);
|
|
|
+ async () => super.startup(),
|
|
|
+
|
|
|
+ async () => {
|
|
|
+ const mongoUrl = config.get<string>("mongo.url");
|
|
|
+
|
|
|
+ return mongoose.connect(mongoUrl);
|
|
|
},
|
|
|
|
|
|
- (next: any) => {
|
|
|
- const mongoUrl = config.get<any>("mongo").url;
|
|
|
- mongoose
|
|
|
- .connect(mongoUrl)
|
|
|
- .then(() => {
|
|
|
- this.loadCollections().then(() => {
|
|
|
- if (this.collections) {
|
|
|
- Object.values(this.collections).forEach(
|
|
|
- collection =>
|
|
|
- collection.model.syncIndexes()
|
|
|
- );
|
|
|
- next();
|
|
|
- } else
|
|
|
- next(
|
|
|
- new Error(
|
|
|
- "Collections have not been loaded"
|
|
|
- )
|
|
|
- );
|
|
|
- });
|
|
|
- })
|
|
|
- .catch(next);
|
|
|
+ async () => this.loadCollections(),
|
|
|
+
|
|
|
+ async () => {
|
|
|
+ if (this.collections) {
|
|
|
+ Object.values(this.collections).forEach(
|
|
|
+ collection => collection.model.syncIndexes()
|
|
|
+ );
|
|
|
+ } else
|
|
|
+ throw new Error("Collections have not been loaded");
|
|
|
},
|
|
|
|
|
|
- (next: any) => {
|
|
|
- const { url, password } = config.get<any>("redis");
|
|
|
+ async () => {
|
|
|
+ const { url, password } = config.get<{
|
|
|
+ url: string;
|
|
|
+ password: string;
|
|
|
+ }>("redis");
|
|
|
+
|
|
|
this.redis = createClient({
|
|
|
url,
|
|
|
password
|
|
|
});
|
|
|
- this.redis
|
|
|
- .connect()
|
|
|
- .then(() => next())
|
|
|
- .catch(next);
|
|
|
+
|
|
|
+ return this.redis.connect();
|
|
|
},
|
|
|
|
|
|
- (next: any) => {
|
|
|
- if (this.redis)
|
|
|
- this.redis
|
|
|
- .sendCommand([
|
|
|
- "CONFIG",
|
|
|
- "GET",
|
|
|
- "notify-keyspace-events"
|
|
|
- ])
|
|
|
- .then(res => {
|
|
|
- if (
|
|
|
- !(Array.isArray(res) && res[1] === "xE")
|
|
|
- )
|
|
|
- next(
|
|
|
- new Error(
|
|
|
- `notify-keyspace-events is NOT configured correctly! It is set to: ${
|
|
|
- (Array.isArray(res) &&
|
|
|
- res[1]) ||
|
|
|
- "unknown"
|
|
|
- }`
|
|
|
- )
|
|
|
- );
|
|
|
- else next();
|
|
|
- })
|
|
|
- .catch(next);
|
|
|
- else
|
|
|
- next(new Error("Redis connection not established"));
|
|
|
+ async () => {
|
|
|
+ if (!this.redis)
|
|
|
+ throw new Error("Redis connection not established");
|
|
|
+
|
|
|
+ return this.redis.sendCommand([
|
|
|
+ "CONFIG",
|
|
|
+ "GET",
|
|
|
+ "notify-keyspace-events"
|
|
|
+ ]);
|
|
|
},
|
|
|
|
|
|
- (next: any) => {
|
|
|
- super.started();
|
|
|
- next();
|
|
|
- }
|
|
|
+ async (redisConfigResponse: string[]) => {
|
|
|
+ if (
|
|
|
+ !(
|
|
|
+ Array.isArray(redisConfigResponse) &&
|
|
|
+ redisConfigResponse[1] === "xE"
|
|
|
+ )
|
|
|
+ )
|
|
|
+ throw new Error(
|
|
|
+ `notify-keyspace-events is NOT configured correctly! It is set to: ${
|
|
|
+ (Array.isArray(redisConfigResponse) &&
|
|
|
+ redisConfigResponse[1]) ||
|
|
|
+ "unknown"
|
|
|
+ }`
|
|
|
+ );
|
|
|
+ },
|
|
|
+
|
|
|
+ async () => super.started()
|
|
|
],
|
|
|
err => {
|
|
|
if (err) reject(err);
|
|
@@ -181,6 +168,15 @@ export default class DataModule extends BaseModule {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ // TODO decide on whether to throw an exception if no results found, possible configurable via param
|
|
|
+ // TODO hide sensitive fields
|
|
|
+ // TOOD don't store sensitive fields in cache
|
|
|
+ // TODO improve caching
|
|
|
+ // TODO add option to only request certain fields
|
|
|
+ // TODO add support for computed fields
|
|
|
+ // TODO parse query
|
|
|
+ // TODO add proper typescript support
|
|
|
+ // TODO add proper jsdoc
|
|
|
/**
|
|
|
* find - Find data
|
|
|
*
|
|
@@ -196,109 +192,159 @@ export default class DataModule extends BaseModule {
|
|
|
collection,
|
|
|
query,
|
|
|
values, // TODO: Add support
|
|
|
- limit = 1, // TODO: Add pagination
|
|
|
- cache = 60
|
|
|
+ limit = 1, // TODO have limit off by default?
|
|
|
+ page = 1,
|
|
|
+ useCache = true,
|
|
|
+ convertArrayToSingle = false
|
|
|
}: {
|
|
|
collection: T;
|
|
|
query: Record<string, any>;
|
|
|
values?: Record<string, any>;
|
|
|
limit?: number;
|
|
|
- cache?: number;
|
|
|
+ page?: number;
|
|
|
+ useCache?: boolean;
|
|
|
+ convertArrayToSingle?: boolean;
|
|
|
}): Promise<any> {
|
|
|
return new Promise((resolve, reject) => {
|
|
|
- if (
|
|
|
- this.redis &&
|
|
|
- this.collections &&
|
|
|
- this.collections[collection]
|
|
|
- ) {
|
|
|
- async.waterfall(
|
|
|
- [
|
|
|
- (next: any) => {
|
|
|
- const idsProvided: any = []; // TODO: Handle properly (e.g. one missing $in causes duplicate or many queries with mixed/no _id)
|
|
|
- (
|
|
|
- (query._id && query._id.$in) || [query._id]
|
|
|
- ).forEach((queryId: any) =>
|
|
|
- idsProvided.push(queryId.toString())
|
|
|
+ let addToCache = false;
|
|
|
+ let cacheKeyName: string | null = null;
|
|
|
+
|
|
|
+ async.waterfall(
|
|
|
+ [
|
|
|
+ // Verify whether the collection exists
|
|
|
+ async () => {
|
|
|
+ if (!collection)
|
|
|
+ throw new Error("No collection specified");
|
|
|
+ if (this.collections && !this.collections[collection])
|
|
|
+ throw new Error("Collection not found");
|
|
|
+ },
|
|
|
+
|
|
|
+ // Verify whether the query is valid-enough to continue
|
|
|
+ async () => {
|
|
|
+ if (
|
|
|
+ !query ||
|
|
|
+ typeof query !== "object" ||
|
|
|
+ Object.keys(query).length === 0
|
|
|
+ )
|
|
|
+ new Error(
|
|
|
+ "Invalid query provided. Query must be an object."
|
|
|
+ );
|
|
|
+ },
|
|
|
+
|
|
|
+ // If we can use cache, get from the cache, and if we get results return those, otherwise return null
|
|
|
+ async () => {
|
|
|
+ // Not using cache, so return
|
|
|
+ if (!useCache) return null;
|
|
|
+ // More than one query key, so impossible to get from cache
|
|
|
+ if (Object.keys(query).length > 1) return null;
|
|
|
+
|
|
|
+ // First key and only key in query object
|
|
|
+ const queryPropertyName = Object.keys(query)[0];
|
|
|
+ // Corresponding property from schema document
|
|
|
+ const documentProperty =
|
|
|
+ this.collections![collection].schema.document[
|
|
|
+ queryPropertyName
|
|
|
+ ];
|
|
|
+
|
|
|
+ if (!documentProperty)
|
|
|
+ throw new Error(
|
|
|
+ `Query property ${queryPropertyName} not found in document.`
|
|
|
);
|
|
|
- const cached: any = [];
|
|
|
- if (cache === -1 || idsProvided.length === 0)
|
|
|
- next(null, cached, idsProvided);
|
|
|
- else {
|
|
|
- async.each(
|
|
|
- idsProvided,
|
|
|
- (queryId, _next) => {
|
|
|
- this.redis
|
|
|
- ?.GET(`${collection}.${queryId}`)
|
|
|
- .then((cacheValue: any) => {
|
|
|
- if (cacheValue)
|
|
|
- cached.push(
|
|
|
- JSON.parse(cacheValue) // TODO: Convert _id to ObjectId
|
|
|
- );
|
|
|
- _next();
|
|
|
- })
|
|
|
- .catch(_next);
|
|
|
- },
|
|
|
- err => next(err, cached, idsProvided)
|
|
|
- );
|
|
|
- }
|
|
|
- },
|
|
|
-
|
|
|
- (cached: any, idsProvided: any, next: any) => {
|
|
|
- if (idsProvided.length === cached.length)
|
|
|
- next(null, [], cached);
|
|
|
- else
|
|
|
- this.collections?.[collection].model
|
|
|
- .find(query)
|
|
|
- .limit(limit)
|
|
|
- .exec((err: any, res: any) => {
|
|
|
- if (
|
|
|
- err ||
|
|
|
- (res.length === 0 &&
|
|
|
- cached.length === 0)
|
|
|
- )
|
|
|
- next(
|
|
|
- new Error(
|
|
|
- err || "No results found"
|
|
|
- )
|
|
|
- );
|
|
|
- else {
|
|
|
- next(null, res, cached);
|
|
|
+ // If query name is not a cache key, just continue
|
|
|
+ if (!documentProperty.cacheKey) return null;
|
|
|
+
|
|
|
+ const values = [];
|
|
|
+ if (
|
|
|
+ Object.prototype.hasOwnProperty.call(
|
|
|
+ query[queryPropertyName],
|
|
|
+ "$in"
|
|
|
+ )
|
|
|
+ )
|
|
|
+ values.push(...query[queryPropertyName].$in);
|
|
|
+ else values.push(query[queryPropertyName]);
|
|
|
+
|
|
|
+ const cachedDocuments: any[] = [];
|
|
|
+
|
|
|
+ await async.each(values, async value =>
|
|
|
+ this.redis
|
|
|
+ ?.GET(
|
|
|
+ `${collection}.${queryPropertyName}.${value.toString()}`
|
|
|
+ )
|
|
|
+ .then((cachedDocument: any) => {
|
|
|
+ if (cachedDocument)
|
|
|
+ cachedDocuments.push(
|
|
|
+ JSON.parse(cachedDocument)
|
|
|
+ );
|
|
|
+ })
|
|
|
+ );
|
|
|
+
|
|
|
+ // TODO optimize this
|
|
|
+ if (cachedDocuments.length !== values.length) {
|
|
|
+ addToCache = true;
|
|
|
+ cacheKeyName = queryPropertyName;
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ return cachedDocuments;
|
|
|
+ },
|
|
|
+
|
|
|
+ // If we didn't get documents from the cache, get them from mongo
|
|
|
+ async (cachedDocuments: any[] | null) => {
|
|
|
+ if (cachedDocuments) return cachedDocuments;
|
|
|
+
|
|
|
+ return this.collections?.[collection].model
|
|
|
+ .find(query)
|
|
|
+ .limit(limit)
|
|
|
+ .skip((page - 1) * limit);
|
|
|
+ },
|
|
|
+
|
|
|
+ // Convert documents from Mongoose model to regular objects, and if we got no documents throw an error
|
|
|
+ async (documents: any[]) => {
|
|
|
+ if (documents.length === 0)
|
|
|
+ throw new Error("No results found.");
|
|
|
+
|
|
|
+ return documents.map(document => {
|
|
|
+ if (!document._doc) return document;
|
|
|
+
|
|
|
+ const rawDocument = document._doc;
|
|
|
+ rawDocument._id = rawDocument._id.toString();
|
|
|
+ return rawDocument;
|
|
|
+ });
|
|
|
+ },
|
|
|
+
|
|
|
+ // Add documents to the cache
|
|
|
+ async (documents: any[]) => {
|
|
|
+ // TODO only add new things to cache
|
|
|
+ // Adds the fetched documents to the cache, but doesn't wait for it to complete
|
|
|
+ if (addToCache && cacheKeyName) {
|
|
|
+ async.each(
|
|
|
+ documents,
|
|
|
+ // TODO verify that the cache key name property actually exists for these documents
|
|
|
+ async (document: any) =>
|
|
|
+ this.redis!.SET(
|
|
|
+ `${collection}.${cacheKeyName}.${document[
|
|
|
+ cacheKeyName!
|
|
|
+ ].toString()}`,
|
|
|
+ JSON.stringify(document),
|
|
|
+ {
|
|
|
+ EX: 60
|
|
|
}
|
|
|
- });
|
|
|
- },
|
|
|
-
|
|
|
- (response: any, cached: any, next: any) => {
|
|
|
- if (cache > -1 && response.length > 0)
|
|
|
- async.each(
|
|
|
- response,
|
|
|
- (res: any, _next) => {
|
|
|
- this.redis
|
|
|
- ?.SET(
|
|
|
- `${collection}.${res._id.toString()}`,
|
|
|
- JSON.stringify(res)
|
|
|
- )
|
|
|
- .then(() => {
|
|
|
- this.redis
|
|
|
- ?.EXPIRE(
|
|
|
- `${collection}.${res._id.toString()}`,
|
|
|
- cache
|
|
|
- )
|
|
|
- .then(() => _next())
|
|
|
- .catch(_next);
|
|
|
- })
|
|
|
- .catch(_next);
|
|
|
- },
|
|
|
- err => next(err, [...response, ...cached])
|
|
|
- );
|
|
|
- else next(null, [...response, ...cached]);
|
|
|
+ )
|
|
|
+ );
|
|
|
}
|
|
|
- ],
|
|
|
- (err, res: any) => {
|
|
|
- if (err) reject(err);
|
|
|
- else resolve(res.length === 1 ? res[0] : res);
|
|
|
+
|
|
|
+ return documents;
|
|
|
}
|
|
|
- );
|
|
|
- } else reject(new Error(`Collection "${collection}" not loaded`));
|
|
|
+ ],
|
|
|
+ (err, documents?: any[]) => {
|
|
|
+ if (err) reject(err);
|
|
|
+ else if (convertArrayToSingle)
|
|
|
+ resolve(
|
|
|
+ documents!.length === 1 ? documents![0] : documents
|
|
|
+ );
|
|
|
+ else resolve(documents);
|
|
|
+ }
|
|
|
+ );
|
|
|
});
|
|
|
}
|
|
|
}
|