瀏覽代碼

refactor: Replaced Promise.all usage with limited concurrency iterator

Owen Diffey 1 年之前
父節點
當前提交
3d8e4360bb

+ 8 - 9
backend/src/BaseModule.ts

@@ -3,6 +3,7 @@ import path from "path";
 import LogBook, { Log } from "@/LogBook";
 import ModuleManager from "@/ModuleManager";
 import Job from "./Job";
+import { forEachIn } from "@/utils/forEachIn";
 
 export enum ModuleStatus {
 	LOADED = "LOADED",
@@ -89,17 +90,15 @@ export default abstract class BaseModule {
 			throw error;
 		}
 
-		await Promise.all(
-			jobs.map(async jobFile => {
-				const { default: Job } = await import(
-					`./modules/${this.constructor.name}/jobs/${jobFile}`
-				);
+		await forEachIn(jobs, async jobFile => {
+			const { default: Job } = await import(
+				`./modules/${this.constructor.name}/jobs/${jobFile}`
+			);
 
-				const jobName = Job.getName();
+			const jobName = Job.getName();
 
-				this._jobs[jobName] = Job;
-			})
-		);
+			this._jobs[jobName] = Job;
+		});
 	}
 
 	/**

+ 8 - 9
backend/src/modules/CacheModule.ts

@@ -1,6 +1,7 @@
 import config from "config";
 import { RedisClientType, createClient } from "redis";
 import BaseModule, { ModuleStatus } from "@/BaseModule";
+import { forEachIn } from "@/utils/forEachIn";
 
 export class CacheModule extends BaseModule {
 	private _redisClient?: RedisClientType;
@@ -109,15 +110,13 @@ export class CacheModule extends BaseModule {
 	}
 
 	public async removeMany(keys: string | string[]) {
-		await Promise.all(
-			(Array.isArray(keys) ? keys : [keys]).map(async pattern => {
-				for await (const key of this._redisClient!.scanIterator({
-					MATCH: pattern
-				})) {
-					await this.remove(key);
-				}
-			})
-		);
+		await forEachIn(Array.isArray(keys) ? keys : [keys], async pattern => {
+			for await (const key of this._redisClient!.scanIterator({
+				MATCH: pattern
+			})) {
+				await this.remove(key);
+			}
+		});
 	}
 
 	public async getTtl(key: string) {

+ 75 - 81
backend/src/modules/DataModule.ts

@@ -11,6 +11,7 @@ import BaseModule, { ModuleStatus } from "@/BaseModule";
 import EventsModule from "./EventsModule";
 import DataModuleJob from "./DataModule/DataModuleJob";
 import Job from "@/Job";
+import { forEachIn } from "@/utils/forEachIn";
 
 export class DataModule extends BaseModule {
 	private _models?: Record<string, Model<any>>;
@@ -135,10 +136,10 @@ export class DataModule extends BaseModule {
 		)
 			return;
 
-		await Promise.all(
-			Object.entries(eventListeners).map(async ([event, callback]) =>
+		await forEachIn(
+			Object.entries(eventListeners),
+			async ([event, callback]) =>
 				EventsModule.subscribe("event", event, callback)
-			)
 		);
 	}
 
@@ -188,47 +189,46 @@ export class DataModule extends BaseModule {
 
 		schema.plugin(updateVersioningPlugin);
 
-		await Promise.all(
-			Object.entries(schema.paths)
-				.filter(
-					([, type]) =>
-						type instanceof SchemaTypes.ObjectId ||
-						(type instanceof SchemaTypes.Array &&
-							type.caster instanceof SchemaTypes.ObjectId)
-				)
-				.map(async ([key, type]) => {
-					const { ref } =
-						(type instanceof SchemaTypes.ObjectId
-							? type?.options
-							: type.caster?.options) ?? {};
-
-					if (ref)
-						schema.path(key).get(value => {
-							if (
-								typeof value === "object" &&
-								type instanceof SchemaTypes.ObjectId
-							)
-								return {
-									_id: value,
-									_name: ref
-								};
-
-							if (
-								Array.isArray(value) &&
-								type instanceof SchemaTypes.Array
-							)
-								return value.map(item =>
-									item === null
-										? null
-										: {
-												_id: item,
-												_name: ref
-										  }
-								);
-
-							return value;
-						});
-				})
+		await forEachIn(
+			Object.entries(schema.paths).filter(
+				([, type]) =>
+					type instanceof SchemaTypes.ObjectId ||
+					(type instanceof SchemaTypes.Array &&
+						type.caster instanceof SchemaTypes.ObjectId)
+			),
+			async ([key, type]) => {
+				const { ref } =
+					(type instanceof SchemaTypes.ObjectId
+						? type?.options
+						: type.caster?.options) ?? {};
+
+				if (ref)
+					schema.path(key).get(value => {
+						if (
+							typeof value === "object" &&
+							type instanceof SchemaTypes.ObjectId
+						)
+							return {
+								_id: value,
+								_name: ref
+							};
+
+						if (
+							Array.isArray(value) &&
+							type instanceof SchemaTypes.Array
+						)
+							return value.map(item =>
+								item === null
+									? null
+									: {
+											_id: item,
+											_name: ref
+									  }
+							);
+
+						return value;
+					});
+			}
 		);
 
 		return this._mongoConnection.model(modelName.toString(), schema);
@@ -257,8 +257,8 @@ export class DataModule extends BaseModule {
 	private async _syncModelIndexes() {
 		if (!this._models) throw new Error("Models not loaded");
 
-		await Promise.all(
-			Object.values(this._models).map(model => model.syncIndexes())
+		await forEachIn(Object.values(this._models), model =>
+			model.syncIndexes()
 		);
 	}
 
@@ -298,15 +298,13 @@ export class DataModule extends BaseModule {
 			throw error;
 		}
 
-		return Promise.all(
-			migrations.map(async migrationFile => {
-				const { default: Migrate }: { default: typeof Migration } =
-					await import(
-						`./DataModule/models/${modelName}/migrations/${migrationFile}`
-					);
-				return new Migrate(this._mongoConnection as Connection);
-			})
-		);
+		return forEachIn(migrations, async migrationFile => {
+			const { default: Migrate }: { default: typeof Migration } =
+				await import(
+					`./DataModule/models/${modelName}/migrations/${migrationFile}`
+				);
+			return new Migrate(this._mongoConnection as Connection);
+		});
 	}
 
 	private async _loadMigrations() {
@@ -314,8 +312,8 @@ export class DataModule extends BaseModule {
 			path.resolve(__dirname, "./DataModule/models/")
 		);
 
-		return Promise.all(
-			models.map(async modelName => this._loadModelMigrations(modelName))
+		return forEachIn(models, async modelName =>
+			this._loadModelMigrations(modelName)
 		);
 	}
 
@@ -332,34 +330,30 @@ export class DataModule extends BaseModule {
 	private async _loadModelJobs() {
 		if (!this._models) throw new Error("Models not loaded");
 
-		await Promise.all(
-			Object.keys(this._models).map(async modelName => {
-				let jobs;
+		await forEachIn(Object.keys(this._models), async modelName => {
+			let jobs;
 
-				try {
-					jobs = await readdir(
-						path.resolve(
-							__dirname,
-							`./${this.constructor.name}/models/${modelName}/jobs/`
-						)
-					);
-				} catch (error) {
-					if (error.code === "ENOENT") return;
-
-					throw error;
-				}
+			try {
+				jobs = await readdir(
+					path.resolve(
+						__dirname,
+						`./${this.constructor.name}/models/${modelName}/jobs/`
+					)
+				);
+			} catch (error) {
+				if (error.code === "ENOENT") return;
 
-				await Promise.all(
-					jobs.map(async jobFile => {
-						const { default: Job } = await import(
-							`./${this.constructor.name}/models/${modelName}/jobs/${jobFile}`
-						);
+				throw error;
+			}
 
-						this._jobs[Job.getName()] = Job;
-					})
+			await forEachIn(jobs, async jobFile => {
+				const { default: Job } = await import(
+					`./${this.constructor.name}/models/${modelName}/jobs/${jobFile}`
 				);
-			})
-		);
+
+				this._jobs[Job.getName()] = Job;
+			});
+		});
 	}
 }
 

+ 3 - 4
backend/src/modules/DataModule/DataModuleJob.ts

@@ -3,6 +3,7 @@ import Job from "@/Job";
 import DataModule from "../DataModule";
 import { JobOptions } from "@/types/JobOptions";
 import { UserModel } from "./models/users/schema";
+import { forEachIn } from "@/utils/forEachIn";
 
 export default abstract class DataModuleJob extends Job {
 	protected static _modelName: string;
@@ -67,10 +68,8 @@ export default abstract class DataModuleJob extends Job {
 		const modelIds = this._payload?.modelIds;
 
 		if (Array.isArray(modelIds)) {
-			await Promise.all(
-				modelIds.map(async _id =>
-					this._context.assertPermission(`${this.getPath()}.${_id}`)
-				)
+			await forEachIn(modelIds, async _id =>
+				this._context.assertPermission(`${this.getPath()}.${_id}`)
 			);
 		}
 

+ 22 - 20
backend/src/modules/DataModule/models/users/jobs/GetModelPermissions.ts

@@ -4,6 +4,7 @@ import DataModule from "@/modules/DataModule";
 import ModuleManager from "@/ModuleManager";
 import GetPermissions from "./GetPermissions";
 import DataModuleJob from "@/modules/DataModule/DataModuleJob";
+import { forEachIn } from "@/utils/forEachIn";
 
 export default class GetModelPermissions extends DataModuleJob {
 	protected static _modelName = "users";
@@ -52,31 +53,32 @@ export default class GetModelPermissions extends DataModuleJob {
 
 		if (modelId && !model) throw new Error("Model not found");
 
-		const jobs = await Promise.all(
-			Object.entries(ModuleManager.getModule("data")?.getJobs() ?? {})
-				.filter(
-					([jobName]) =>
-						jobName.startsWith(modelName.toString()) &&
-						(modelId ? true : !jobName.endsWith("ById"))
-				)
-				.map(async ([jobName, Job]) => {
-					jobName = `data.${jobName}`;
+		const jobs = await forEachIn(
+			Object.entries(
+				ModuleManager.getModule("data")?.getJobs() ?? {}
+			).filter(
+				([jobName]) =>
+					jobName.startsWith(modelName.toString()) &&
+					(modelId ? true : !jobName.endsWith("ById"))
+			),
+			async ([jobName, Job]) => {
+				jobName = `data.${jobName}`;
 
-					let hasPermission = permissions[jobName];
+				let hasPermission = permissions[jobName];
 
-					if (!hasPermission && modelId)
-						hasPermission =
-							permissions[`${jobName}.*`] ||
-							permissions[`${jobName}.${modelId}`];
+				if (!hasPermission && modelId)
+					hasPermission =
+						permissions[`${jobName}.*`] ||
+						permissions[`${jobName}.${modelId}`];
 
-					if (hasPermission) return [jobName, true];
+				if (hasPermission) return [jobName, true];
 
-					if (typeof Job.hasPermission === "function") {
-						hasPermission = await Job.hasPermission(model, user);
-					}
+				if (typeof Job.hasPermission === "function") {
+					hasPermission = await Job.hasPermission(model, user);
+				}
 
-					return [jobName, !!hasPermission];
-				})
+				return [jobName, !!hasPermission];
+			}
 		);
 
 		const modelPermissions = Object.fromEntries(jobs);

+ 20 - 22
backend/src/modules/EventsModule.ts

@@ -2,6 +2,7 @@ import { createClient, RedisClientType } from "redis";
 import config from "config";
 import BaseModule, { ModuleStatus } from "@/BaseModule";
 import WebSocketModule from "./WebSocketModule";
+import { forEachIn } from "@/utils/forEachIn";
 
 export class EventsModule extends BaseModule {
 	private _pubClient?: RedisClientType;
@@ -134,8 +135,8 @@ export class EventsModule extends BaseModule {
 			async message => {
 				if (!this._scheduleCallbacks[message]) return;
 
-				await Promise.all(
-					this._scheduleCallbacks[message].map(callback => callback())
+				await forEachIn(this._scheduleCallbacks[message], callback =>
+					callback()
 				);
 			}
 		);
@@ -184,19 +185,17 @@ export class EventsModule extends BaseModule {
 			message = message.substring(1).substring(0, message.length - 2);
 
 		if (this._subscriptions && this._subscriptions[channel])
-			await Promise.all(
-				this._subscriptions[channel].map(async cb => cb(message))
+			await forEachIn(this._subscriptions[channel], async cb =>
+				cb(message)
 			);
 
 		if (this._pSubscriptions)
-			await Promise.all(
-				Object.entries(this._pSubscriptions)
-					.filter(([subscription]) =>
-						new RegExp(channel).test(subscription)
-					)
-					.map(async ([, callbacks]) =>
-						Promise.all(callbacks.map(async cb => cb(message)))
-					)
+			await forEachIn(
+				Object.entries(this._pSubscriptions).filter(([subscription]) =>
+					new RegExp(channel).test(subscription)
+				),
+				async ([, callbacks]) =>
+					forEachIn(callbacks, async cb => cb(message))
 			);
 
 		if (!this._socketSubscriptions[channel]) return;
@@ -338,8 +337,8 @@ export class EventsModule extends BaseModule {
 	}
 
 	public async subscribeManySocket(channels: string[], socketId: string) {
-		await Promise.all(
-			channels.map(channel => this.subscribeSocket(channel, socketId))
+		await forEachIn(channels, channel =>
+			this.subscribeSocket(channel, socketId)
 		);
 	}
 
@@ -359,18 +358,17 @@ export class EventsModule extends BaseModule {
 	}
 
 	public async unsubscribeManySocket(channels: string[], socketId: string) {
-		await Promise.all(
-			channels.map(channel => this.unsubscribeSocket(channel, socketId))
+		await forEachIn(channels, channel =>
+			this.unsubscribeSocket(channel, socketId)
 		);
 	}
 
 	public async unsubscribeAllSocket(socketId: string) {
-		await Promise.all(
-			Object.entries(this._socketSubscriptions)
-				.filter(([, socketIds]) => socketIds.has(socketId))
-				.map(async ([channel]) =>
-					this.unsubscribeSocket(channel, socketId)
-				)
+		await forEachIn(
+			Object.entries(this._socketSubscriptions).filter(([, socketIds]) =>
+				socketIds.has(socketId)
+			),
+			async ([channel]) => this.unsubscribeSocket(channel, socketId)
 		);
 	}
 

+ 20 - 22
backend/src/modules/EventsModule/jobs/SubscribeMany.ts

@@ -1,6 +1,7 @@
 import Job from "@/Job";
 import EventsModule from "@/modules/EventsModule";
 import { JobOptions } from "@/types/JobOptions";
+import { forEachIn } from "@/utils/forEachIn";
 
 export default class SubscribeMany extends Job {
 	public constructor(payload?: unknown, options?: JobOptions) {
@@ -21,28 +22,25 @@ export default class SubscribeMany extends Job {
 	}
 
 	protected override async _authorize() {
-		await Promise.all(
-			this._payload.channels.map(async (channel: string) => {
-				const [, moduleName, modelName, event, modelId] =
-					/^([a-z]+)\.([a-z]+)\.([A-z]+)\.?([A-z0-9]+)?$/.exec(
-						channel
-					) ?? [];
-
-				let permission = `event.${channel}`;
-
-				if (
-					moduleName === "model" &&
-					modelName &&
-					(modelId || event === "created")
-				) {
-					if (event === "created")
-						permission = `event.model.${modelName}.created`;
-					else permission = `data.${modelName}.findById.${modelId}`;
-				}
-
-				await this._context.assertPermission(permission);
-			})
-		);
+		await forEachIn(this._payload.channels, async (channel: string) => {
+			const [, moduleName, modelName, event, modelId] =
+				/^([a-z]+)\.([a-z]+)\.([A-z]+)\.?([A-z0-9]+)?$/.exec(channel) ??
+				[];
+
+			let permission = `event.${channel}`;
+
+			if (
+				moduleName === "model" &&
+				modelName &&
+				(modelId || event === "created")
+			) {
+				if (event === "created")
+					permission = `event.model.${modelName}.created`;
+				else permission = `data.${modelName}.findById.${modelId}`;
+			}
+
+			await this._context.assertPermission(permission);
+		});
 	}
 
 	protected async _execute() {

+ 13 - 14
backend/src/modules/WebSocketModule.ts

@@ -11,6 +11,7 @@ import DataModule from "./DataModule";
 import { UserModel } from "./DataModule/models/users/schema";
 import { SessionModel } from "./DataModule/models/sessions/schema";
 import EventsModule from "./EventsModule";
+import { forEachIn } from "@/utils/forEachIn";
 
 export class WebSocketModule extends BaseModule {
 	private _httpServer?: Server;
@@ -84,20 +85,18 @@ export class WebSocketModule extends BaseModule {
 		if (!this._wsServer) return;
 
 		for await (const clients of this._wsServer.clients.entries()) {
-			await Promise.all(
-				clients.map(async socket => {
-					switch (socket.readyState) {
-						case socket.OPEN:
-							socket.ping();
-							break;
-						case socket.CLOSED:
-							socket.terminate();
-							break;
-						default:
-							break;
-					}
-				})
-			);
+			await forEachIn(clients, async socket => {
+				switch (socket.readyState) {
+					case socket.OPEN:
+						socket.ping();
+						break;
+					case socket.CLOSED:
+						socket.terminate();
+						break;
+					default:
+						break;
+				}
+			});
 		}
 	}
 

+ 38 - 0
backend/src/utils/forEachIn.ts

@@ -0,0 +1,38 @@
+export const forEachIn = async <
+	ItemsType extends Array<any>,
+	CallbackType extends (
+		item: ItemsType[number],
+		index: number
+	) => Promise<any>,
+	CallbackReturnType = Awaited<ReturnType<CallbackType>>
+>(
+	items: ItemsType,
+	callback: CallbackType,
+	concurrency = 10
+): Promise<CallbackReturnType[]> => {
+	const queued = items.slice();
+	const failed: any[] = []; // TODO: Report these errors and abortOnError option
+	const completed: CallbackReturnType[] = [];
+
+	const next = async () => {
+		const [item] = queued.splice(0, 1);
+
+		if (typeof item === "undefined") return;
+
+		const index = items.indexOf(item);
+
+		try {
+			completed[index] = await callback(item, index);
+		} catch (error) {
+			failed[index] = error;
+		}
+
+		await next();
+	};
+
+	await Promise.all(
+		Array.from(Array(Math.min(items.length, concurrency)).keys()).map(next)
+	);
+
+	return completed;
+};