Przeglądaj źródła

feat: Bulk event (un)subscribing

Owen Diffey 1 rok temu
rodzic
commit
7c595b34d9

+ 30 - 0
backend/src/modules/APIModule.ts

@@ -328,6 +328,21 @@ export default class APIModule extends BaseModule {
 			});
 	}
 
+	public async subscribeMany(
+		context: JobContext,
+		payload: { channels: string[] }
+	) {
+		const { channels } = payload;
+
+		await Promise.all(
+			channels.map(channel =>
+				context.executeJob("api", "subscribe", {
+					channel
+				})
+			)
+		);
+	}
+
 	public async unsubscribe(
 		context: JobContext,
 		payload: { channel: string }
@@ -359,6 +374,21 @@ export default class APIModule extends BaseModule {
 		}
 	}
 
+	public async unsubscribeMany(
+		context: JobContext,
+		payload: { channels: string[] }
+	) {
+		const { channels } = payload;
+
+		await Promise.all(
+			channels.map(channel =>
+				context.executeJob("api", "unsubscribe", {
+					channel
+				})
+			)
+		);
+	}
+
 	public async unsubscribeAll(context: JobContext) {
 		const socketId = context.getSocketId();
 

+ 65 - 79
frontend/src/components/AdvancedTable.vue

@@ -212,20 +212,46 @@ const hasCheckboxes = computed(
 );
 const aModalIsOpen = computed(() => Object.keys(activeModals.value).length > 0);
 
+const onUpdatedCallback = ({ doc }) => {
+	const docRow = rows.value.find(_row => _row._id === doc._id);
+	const docRowIndex = rows.value.findIndex(_row => _row._id === doc._id);
+
+	if (!docRow) return;
+
+	rows.value[docRowIndex] = {
+		...docRow,
+		...doc,
+		updated: true
+	};
+};
+
+const onDeletedCallback = ({ oldDoc }) => {
+	const docRow = rows.value.find(_row => _row._id === oldDoc._id);
+	const docRowIndex = rows.value.findIndex(_row => _row._id === oldDoc._id);
+
+	if (!docRow) return;
+
+	rows.value[docRowIndex] = {
+		...docRow,
+		selected: false,
+		removed: true
+	};
+};
+
 const unsubscribe = async (_subscriptions?) => {
 	_subscriptions = _subscriptions ?? subscriptions.value;
 
-	await Promise.allSettled(
-		Object.entries(_subscriptions).map(
-			async ([modelId, { updated, deleted }]) => {
-				await Promise.allSettled([
-					events.unsubscribe(updated),
-					events.unsubscribe(deleted)
-				]);
+	await events.unsubscribeMany(
+		Object.values(_subscriptions).flatMap(({ updated, deleted }) => [
+			updated,
+			deleted
+		])
+	);
 
-				delete subscriptions.value[modelId];
-			}
-		)
+	await Promise.all(
+		Object.keys(_subscriptions).map(async modelId => {
+			delete subscriptions.value[modelId];
+		})
 	);
 };
 
@@ -234,81 +260,41 @@ const subscribe = async () => {
 		JSON.stringify(subscriptions.value)
 	);
 
-	await Promise.allSettled(
-		rows.value.map(async row => {
-			if (subscriptions.value[row._id]) {
-				// console.log(11110, row);
+	await Promise.all(
+		rows.value
+			.filter(row => subscriptions.value[row._id])
+			.map(async row => {
 				delete previousSubscriptions[row._id];
-				return;
-			}
-
-			// console.log(11111, row);
-
-			const updated = await events.subscribe(
-				`model.${props.model}.updated.${row._id}`,
-				({ doc }) => {
-					const docRow = rows.value.find(
-						_row => _row._id === doc._id
-					);
-					const docRowIndex = rows.value.findIndex(
-						_row => _row._id === doc._id
-					);
-					console.log(45634643, docRow, docRowIndex);
+			})
+	);
 
-					if (!docRow) return;
+	const uuids = await events.subscribeMany(
+		Object.fromEntries(
+			rows.value
+				.filter(row => !subscriptions.value[row._id])
+				.flatMap(row => [
+					[
+						`model.${props.model}.updated.${row._id}`,
+						onUpdatedCallback
+					],
+					[
+						`model.${props.model}.deleted.${row._id}`,
+						onDeletedCallback
+					]
+				])
+		)
+	);
 
-					rows.value[docRowIndex] = {
-						...docRow,
-						...doc,
-						updated: true
-					};
-				}
-			);
-			// console.log(11112, updated);
-
-			let deleted;
-
-			try {
-				deleted = await events.subscribe(
-					`model.${props.model}.deleted.${row._id}`,
-					({ oldDoc }) => {
-						const docRow = rows.value.find(
-							_row => _row._id === oldDoc._id
-						);
-						const docRowIndex = rows.value.findIndex(
-							_row => _row._id === oldDoc._id
-						);
-						console.log(34436, docRow, docRowIndex);
-
-						if (!docRow) return;
-
-						rows.value[docRowIndex] = {
-							...docRow,
-							selected: false,
-							removed: true
-						};
-					}
-				);
-			} catch (error) {
-				console.log(11113, error);
-				unsubscribe([updated]);
+	await Promise.all(
+		Object.entries(uuids).map(async ([channel, uuid]) => {
+			const [, , , event, modelId] =
+				/^([a-z]+)\.([a-z]+)\.([A-z]+)\.?([A-z0-9]+)?$/.exec(channel) ??
+				[];
 
-				throw error;
-			}
-			// console.log(11114, deleted);
-
-			subscriptions.value[row._id] = { updated, deleted };
-			// console.log(
-			// 	11115,
-			// 	Object.entries(subscriptions.value),
-			// 	subscriptions.value,
-			// 	subscriptions.value[row._id],
-			// 	row._id,
-			// 	{ updated, deleted }
-			// );
+			subscriptions.value[modelId] ??= {};
+			subscriptions.value[modelId][event] = uuid;
 		})
 	);
-	console.log(11116, subscriptions.value, previousSubscriptions);
 
 	unsubscribe(previousSubscriptions);
 };

+ 38 - 3
frontend/src/composables/useEvents.ts

@@ -28,11 +28,28 @@ export const useEvents = () => {
 
 		subscriptions.value[uuid] = { channel, callback };
 
-		console.log(11114, uuid, subscriptions.value[uuid]);
-
 		return uuid;
 	};
 
+	const subscribeMany = async channels => {
+		const _subscriptions = await websocketStore.subscribeMany(channels);
+
+		await Promise.all(
+			Object.entries(_subscriptions).map(
+				async ([uuid, { channel, callback }]) => {
+					subscriptions.value[uuid] = { channel, callback };
+				}
+			)
+		);
+
+		return Object.fromEntries(
+			Object.entries(_subscriptions).map(([uuid, { channel }]) => [
+				channel,
+				uuid
+			])
+		);
+	};
+
 	const unsubscribe = async uuid => {
 		if (!subscriptions.value[uuid]) return;
 
@@ -43,6 +60,22 @@ export const useEvents = () => {
 		delete subscriptions.value[uuid];
 	};
 
+	const unsubscribeMany = async uuids => {
+		const _subscriptions = Object.fromEntries(
+			Object.entries(subscriptions.value)
+				.filter(([uuid]) => uuids.includes(uuid))
+				.map(([uuid, { channel }]) => [uuid, channel])
+		);
+
+		await websocketStore.unsubscribeMany(_subscriptions);
+
+		return Promise.all(
+			uuids.map(async uuid => {
+				delete subscriptions.value[uuid];
+			})
+		);
+	};
+
 	onBeforeUnmount(async () => {
 		await Promise.allSettled(
 			Object.keys(subscriptions.value).map(uuid => unsubscribe(uuid))
@@ -61,6 +94,8 @@ export const useEvents = () => {
 		onReady,
 		removeReadyCallback,
 		subscribe,
-		unsubscribe
+		subscribeMany,
+		unsubscribe,
+		unsubscribeMany
 	};
 };

+ 50 - 0
frontend/src/stores/websocket.ts

@@ -50,6 +50,29 @@ export const useWebsocketStore = defineStore("websocket", () => {
 		return uuid;
 	};
 
+	const subscribeMany = async (
+		channels: Record<string, (data?: any) => any>
+	) => {
+		await runJob("api.subscribeMany", {
+			channels: Object.keys(channels).filter(
+				channel => !socketChannels.includes(channel)
+			)
+		});
+
+		return Object.fromEntries(
+			Object.entries(channels).map(([channel, callback]) => {
+				if (!subscriptions.value[channel])
+					subscriptions.value[channel] = {};
+
+				const uuid = utils.guid();
+
+				subscriptions.value[channel][uuid] = callback;
+
+				return [uuid, { channel, callback }];
+			})
+		);
+	};
+
 	const unsubscribe = async (channel: string, uuid: string) => {
 		if (
 			!subscriptions.value[channel] ||
@@ -69,6 +92,31 @@ export const useWebsocketStore = defineStore("websocket", () => {
 			delete subscriptions.value[channel];
 	};
 
+	const unsubscribeMany = async (channels: Record<string, string>) => {
+		await runJob("api.unsubscribeMany", {
+			channels: Object.values(channels).filter(
+				channel =>
+					!socketChannels.includes(channel) &&
+					Object.keys(subscriptions.value[channel]).length <= 1
+			)
+		});
+
+		return Promise.all(
+			Object.entries(channels).map(async ([uuid, channel]) => {
+				if (
+					!subscriptions.value[channel] ||
+					!subscriptions.value[channel][uuid]
+				)
+					return;
+
+				delete subscriptions.value[channel][uuid];
+
+				if (Object.keys(subscriptions.value[channel]).length === 0)
+					delete subscriptions.value[channel];
+			})
+		);
+	};
+
 	const unsubscribeAll = async () => {
 		await runJob("api.unsubscribeAll");
 
@@ -182,7 +230,9 @@ export const useWebsocketStore = defineStore("websocket", () => {
 		subscriptions,
 		runJob,
 		subscribe,
+		subscribeMany,
 		unsubscribe,
+		unsubscribeMany,
 		unsubscribeAll,
 		onReady,
 		removeReadyCallback