فهرست منبع

feat: Adds useEvents composable

Owen Diffey 1 سال پیش
والد
کامیت
b9bec7b19f

+ 97 - 50
frontend/src/components/AdvancedTable.vue

@@ -23,6 +23,7 @@ import {
 	TableFilter,
 	TableBulkActions
 } from "@/types/advancedTable";
+import { useEvents } from "@/composables/useEvents";
 
 const { dragBox, setInitialBox, onDragBox, resetBoxPosition } = useDragBox();
 
@@ -76,7 +77,9 @@ const router = useRouter();
 const modalsStore = useModalsStore();
 const { activeModals } = storeToRefs(modalsStore);
 
-const websocketStore = useWebsocketStore();
+const { runJob } = useWebsocketStore();
+
+const events = useEvents();
 
 const page = ref(1);
 const pageSize = ref(10);
@@ -177,10 +180,15 @@ const columnOrderChangedDebounceTimeout = ref();
 const lastSelectedItemIndex = ref(0);
 const bulkPopup = ref();
 const rowElements = ref([]);
-const subscriptions = ref({
-	updated: new Set(),
-	deleted: new Set()
-});
+const subscriptions = ref<
+	Record<
+		string,
+		{
+			updated: string;
+			deleted: string;
+		}
+	>
+>({});
 
 const lastPage = computed(() => Math.ceil(count.value / pageSize.value));
 const sortedFilteredColumns = computed(() =>
@@ -208,66 +216,105 @@ const unsubscribe = async (_subscriptions?) => {
 	_subscriptions = _subscriptions ?? subscriptions.value;
 
 	await Promise.allSettled(
-		Object.entries(_subscriptions).map(async ([event, modelIds]) => {
-			for await (const modelId of modelIds.values()) {
-				await websocketStore.unsubscribe(
-					`model.${props.model}.${event}.${modelId}`,
-					subscribe
-				);
-
-				subscriptions.value[event].delete(modelId);
+		Object.entries(_subscriptions).map(
+			async ([modelId, { updated, deleted }]) => {
+				await Promise.allSettled([
+					events.unsubscribe(updated),
+					events.unsubscribe(deleted)
+				]);
+
+				delete subscriptions.value[modelId];
 			}
-		})
+		)
 	);
 };
 
 const subscribe = async () => {
-	const previousSubscriptions = subscriptions.value;
+	const previousSubscriptions = JSON.parse(
+		JSON.stringify(subscriptions.value)
+	);
 
 	await Promise.allSettled(
-		rows.value.map((row, index) =>
-			Object.entries(subscriptions.value).map(
-				async ([event, modelIds]) => {
-					if (modelIds.has(row._id)) {
-						previousSubscriptions[event].delete(row._id);
-						return;
-					}
+		rows.value.map(async row => {
+			if (subscriptions.value[row._id]) {
+				// console.log(11110, row);
+				delete previousSubscriptions[row._id];
+				return;
+			}
 
-					await websocketStore.subscribe(
-						`model.${props.model}.${event}.${row._id}`,
-						({ doc }) => {
-							switch (event) {
-								case "updated":
-									rows.value[index] = {
-										...row,
-										...doc,
-										updated: true
-									};
-									break;
-								case "deleted":
-									rows.value[index] = {
-										...row,
-										selected: false,
-										removed: true
-									};
-									break;
-								default:
-									break;
-							}
-						}
+			// console.log(11111, row);
+
+			const updated = await events.subscribe(
+				`model.${props.model}.updated.${row._id}`,
+				({ doc }) => {
+					const docRow = rows.value.find(
+						_row => _row._id === doc._id
+					);
+					const docRowIndex = rows.value.findIndex(
+						_row => _row._id === doc._id
 					);
+					console.log(45634643, docRow, docRowIndex);
 
-					subscriptions.value[event].add(row._id);
+					if (!docRow) return;
+
+					rows.value[docRowIndex] = {
+						...docRow,
+						...doc,
+						updated: true
+					};
 				}
-			)
-		)
+			);
+			// console.log(11112, updated);
+
+			let deleted;
+
+			try {
+				deleted = await events.subscribe(
+					`model.${props.model}.deleted.${row._id}`,
+					({ doc }) => {
+						const docRow = rows.value.find(
+							_row => _row._id === doc._id
+						);
+						const docRowIndex = rows.value.findIndex(
+							_row => _row._id === doc._id
+						);
+						console.log(34436, docRow, docRowIndex);
+
+						if (!docRow) return;
+
+						rows.value[docRowIndex] = {
+							...docRow,
+							selected: false,
+							removed: true
+						};
+					}
+				);
+			} catch (error) {
+				console.log(11113, error);
+				unsubscribe([updated]);
+
+				throw error;
+			}
+			// console.log(11114, deleted);
+
+			subscriptions.value[row._id] = { updated, deleted };
+			// console.log(
+			// 	11115,
+			// 	Object.entries(subscriptions.value),
+			// 	subscriptions.value,
+			// 	subscriptions.value[row._id],
+			// 	row._id,
+			// 	{ updated, deleted }
+			// );
+		})
 	);
+	console.log(11116, subscriptions.value, previousSubscriptions);
 
 	unsubscribe(previousSubscriptions);
 };
 
 const getData = async () => {
-	const data = await websocketStore.runJob(`data.${props.model}.getData`, {
+	const data = await runJob(`data.${props.model}.getData`, {
 		page: page.value,
 		pageSize: pageSize.value,
 		properties: properties.value,
@@ -946,7 +993,7 @@ onMounted(async () => {
 		}
 	}
 
-	await websocketStore.onReady(async () => {
+	await events.onReady(async () => {
 		await getData();
 
 		if (props.query) setQuery();
@@ -954,7 +1001,7 @@ onMounted(async () => {
 		await Promise.allSettled(
 			props.filters.map(async filter => {
 				if (filter.autosuggest && filter.autosuggestDataAction) {
-					const { items } = await websocketStore.runJob(
+					const { items } = await runJob(
 						filter.autosuggestDataAction
 					);
 					autosuggest.value.allItems[filter.name] = items;

+ 26 - 24
frontend/src/components/modals/EditNews.vue

@@ -4,10 +4,10 @@ import { marked } from "marked";
 import DOMPurify from "dompurify";
 import Toast from "toasters";
 import { formatDistance } from "date-fns";
-import { useWebsocketStore } from "@/stores/websocket";
 import { useModalsStore } from "@/stores/modals";
 import { useNewsModelStore } from "@/stores/models/news";
 import { useForm } from "@/composables/useForm";
+import { useEvents } from "@/composables/useEvents";
 
 const Modal = defineAsyncComponent(() => import("@/components/Modal.vue"));
 const SaveButton = defineAsyncComponent(
@@ -24,11 +24,12 @@ const props = defineProps({
 	sector: { type: String, default: "admin" }
 });
 
-const { onReady, removeReadyCallback } = useWebsocketStore();
+const { onReady } = useEvents();
 
 const { closeCurrentModal } = useModalsStore();
 
-const { create, findById, updateById, unregisterModels } = useNewsModelStore();
+const { create, findById, updateById, hasPermission, unregisterModels } =
+	useNewsModelStore();
 
 const createdBy = ref();
 const createdAt = ref(0);
@@ -96,24 +97,6 @@ const { inputs, save, setOriginalValue } = useForm(
 	}
 );
 
-const onReadyCallback = async () => {
-	if (props.newsId && !props.createNews) {
-		const { value: data } = await findById(props.newsId).catch(() => {
-			new Toast("News with that ID not found.");
-			closeCurrentModal();
-		});
-
-		setOriginalValue({
-			markdown: data.markdown,
-			status: data.status,
-			showToNewUsers: data.showToNewUsers
-		});
-
-		createdBy.value = data.createdBy;
-		createdAt.value = data.createdAt;
-	}
-};
-
 onMounted(async () => {
 	marked.use({
 		renderer: {
@@ -126,13 +109,32 @@ onMounted(async () => {
 		}
 	});
 
-	await onReady(onReadyCallback);
+	await onReady(async () => {
+		if (props.newsId && !props.createNews) {
+			const { value: data } = await findById(props.newsId).catch(() => {
+				new Toast("News with that ID not found.");
+				closeCurrentModal();
+			});
+
+			setOriginalValue({
+				markdown: data.markdown,
+				status: data.status,
+				showToNewUsers: data.showToNewUsers
+			});
+
+			createdBy.value = data.createdBy;
+			createdAt.value = data.createdAt;
+		}
+
+		console.log(
+			43534543,
+			await hasPermission("data.news.published", props.newsId)
+		);
+	});
 });
 
 onBeforeUnmount(async () => {
 	if (props.newsId && !props.createNews) await unregisterModels(props.newsId);
-
-	removeReadyCallback(onReadyCallback);
 });
 </script>
 

+ 66 - 0
frontend/src/composables/useEvents.ts

@@ -0,0 +1,66 @@
+import { onBeforeUnmount, ref } from "vue";
+import { useWebsocketStore } from "@/stores/websocket";
+
+export const useEvents = () => {
+	const websocketStore = useWebsocketStore();
+
+	const readySubscriptions = ref({});
+	const subscriptions = ref({});
+
+	const onReady = async callback => {
+		const uuid = await websocketStore.onReady(callback);
+
+		readySubscriptions.value[uuid] = { callback };
+
+		return uuid;
+	};
+
+	const removeReadyCallback = uuid => {
+		if (!readySubscriptions.value[uuid]) return;
+
+		websocketStore.removeReadyCallback(uuid);
+
+		delete readySubscriptions.value[uuid];
+	};
+
+	const subscribe = async (channel, callback) => {
+		const uuid = await websocketStore.subscribe(channel, callback);
+
+		subscriptions.value[uuid] = { channel, callback };
+
+		console.log(11114, uuid, subscriptions.value[uuid]);
+
+		return uuid;
+	};
+
+	const unsubscribe = async uuid => {
+		if (!subscriptions.value[uuid]) return;
+
+		const { channel } = subscriptions.value[uuid];
+
+		await websocketStore.unsubscribe(channel, uuid);
+
+		delete subscriptions.value[uuid];
+	};
+
+	onBeforeUnmount(async () => {
+		await Promise.allSettled(
+			Object.keys(subscriptions.value).map(uuid => unsubscribe(uuid))
+		);
+
+		await Promise.allSettled(
+			Object.keys(readySubscriptions.value).map(async uuid =>
+				removeReadyCallback(uuid)
+			)
+		);
+	});
+
+	return {
+		readySubscriptions,
+		subscriptions,
+		onReady,
+		removeReadyCallback,
+		subscribe,
+		unsubscribe
+	};
+};

+ 13 - 14
frontend/src/pages/News.vue

@@ -13,6 +13,7 @@ import {
 import { GetPublishedNewsResponse } from "@musare_types/actions/NewsActions";
 import { useWebsocketStore } from "@/stores/websocket";
 import { useNewsModelStore } from "@/stores/models/news";
+import { useEvents } from "@/composables/useEvents";
 
 const MainHeader = defineAsyncComponent(
 	() => import("@/components/MainHeader.vue")
@@ -24,20 +25,18 @@ const UserLink = defineAsyncComponent(
 	() => import("@/components/UserLink.vue")
 );
 
-const { onReady, subscribe, unsubscribe, removeReadyCallback } =
-	useWebsocketStore();
+const { onReady, subscribe } = useEvents();
 const { newest, registerModels, unregisterModels } = useNewsModelStore();
 
 const news = ref<NewsModel[]>([]);
 
 const { sanitize } = DOMPurify;
 
-const onCreated = async ({ doc }) => {
-	news.value.unshift(...(await registerModels(doc)));
-};
-
-const onReadyCallback = async () => {
-	news.value = await newest();
+const onDeleted = async ({ oldDoc }) => {
+	news.value.splice(
+		news.value.findIndex(doc => doc._id === oldDoc._id),
+		1
+	);
 };
 
 onMounted(async () => {
@@ -52,9 +51,13 @@ onMounted(async () => {
 		}
 	});
 
-	await onReady(onReadyCallback);
+	await onReady(async () => {
+		news.value = await newest();
+	});
 
-	await subscribe("model.news.created", onCreated);
+	await subscribe("model.news.created", async ({ doc }) => {
+		news.value.unshift(...(await registerModels(doc)));
+	});
 
 	// TODO: Subscribe to loaded model updated/deleted events
 	// socket.on("event:news.updated", (res: NewsUpdatedResponse) => {
@@ -81,10 +84,6 @@ onMounted(async () => {
 
 onBeforeUnmount(async () => {
 	await unregisterModels(news.value.map(model => model.value._id));
-
-	await unsubscribe("model.news.created", onCreated);
-
-	removeReadyCallback(onReadyCallback);
 });
 </script>
 

+ 26 - 17
frontend/src/stores/models/model.ts

@@ -7,6 +7,7 @@ export const createModelStore = modelName => {
 	const models = ref<Ref<any>[]>([]);
 	const permissions = ref(null);
 	const modelPermissions = ref({});
+	const subscriptions = ref({});
 
 	const onUpdated = async ({ doc }) => {
 		const index = models.value.findIndex(
@@ -33,15 +34,26 @@ export const createModelStore = modelName => {
 				if (!models.value.find(model => model.value._id === _doc._id))
 					models.value.push(docRef);
 
-				await subscribe(
-					`model.${modelName}.updated.${_doc._id}`,
-					onUpdated
-				);
-
-				await subscribe(
-					`model.${modelName}.deleted.${_doc._id}`,
-					onDeleted
-				);
+				const updatedChannel = `model.${modelName}.updated.${_doc._id}`;
+				const updatedUuid = await subscribe(updatedChannel, onUpdated);
+				const updated = {
+					channel: updatedChannel,
+					callback: onUpdated,
+					uuid: updatedUuid
+				};
+
+				const deletedChannel = `model.${modelName}.updated.${_doc._id}`;
+				const deletedUuid = await subscribe(deletedChannel, onDeleted);
+				const deleted = {
+					channel: deletedChannel,
+					callback: onDeleted,
+					uuid: deletedUuid
+				};
+
+				subscriptions.value[_doc._id] = {
+					updated,
+					deleted
+				};
 
 				return docRef;
 			})
@@ -58,15 +70,11 @@ export const createModelStore = modelName => {
 					)
 						return;
 
-					await unsubscribe(
-						`model.${modelName}.updated.${modelId}`,
-						onUpdated
-					);
+					const { updated, deleted } = subscriptions.value[modelId];
 
-					await unsubscribe(
-						`model.${modelName}.deleted.${modelId}`,
-						onDeleted
-					);
+					await unsubscribe(updated.channel, updated.uuid);
+
+					await unsubscribe(deleted.channel, deleted.uuid);
 
 					models.value.splice(
 						models.value.findIndex(
@@ -135,6 +143,7 @@ export const createModelStore = modelName => {
 		models,
 		permissions,
 		modelPermissions,
+		subscriptions,
 		registerModels,
 		unregisterModels,
 		getUserModelPermissions,

+ 37 - 21
frontend/src/stores/websocket.ts

@@ -11,7 +11,7 @@ export const useWebsocketStore = defineStore("websocket", () => {
 
 	const socket = ref();
 	const ready = ref(false);
-	const readyCallbacks = ref(new Set());
+	const readyCallbacks = ref({});
 	const jobCallbacks = ref({});
 	const pendingJobs = ref([]);
 	const subscriptions = ref({});
@@ -41,22 +41,29 @@ export const useWebsocketStore = defineStore("websocket", () => {
 		if (!socketChannels.includes(channel))
 			await runJob("api.subscribe", { channel });
 
-		if (!subscriptions.value[channel])
-			subscriptions.value[channel] = new Set();
+		if (!subscriptions.value[channel]) subscriptions.value[channel] = {};
 
-		subscriptions.value[channel].add(callback);
+		const uuid = utils.guid();
+
+		subscriptions.value[channel][uuid] = callback;
+
+		return uuid;
 	};
 
-	const unsubscribe = async (
-		channel: string,
-		callback: (data?: any) => any
-	) => {
+	const unsubscribe = async (channel: string, uuid: string) => {
 		if (!socketChannels.includes(channel))
 			await runJob("api.unsubscribe", { channel });
 
-		if (!subscriptions.value[channel]) return;
+		if (
+			!subscriptions.value[channel] ||
+			!subscriptions.value[channel][uuid]
+		)
+			return;
+
+		delete subscriptions.value[channel][uuid];
 
-		subscriptions.value[channel].delete(callback);
+		if (Object.keys(subscriptions.value[channel]).length === 0)
+			delete subscriptions.value[channel];
 	};
 
 	const unsubscribeAll = async () => {
@@ -66,12 +73,19 @@ export const useWebsocketStore = defineStore("websocket", () => {
 	};
 
 	const onReady = async (callback: () => any) => {
-		readyCallbacks.value.add(callback);
+		const uuid = utils.guid();
+
+		readyCallbacks.value[uuid] = callback;
+
 		if (ready.value) await callback();
+
+		return uuid;
 	};
 
-	const removeReadyCallback = (callback: () => any) => {
-		readyCallbacks.value.delete(callback);
+	const removeReadyCallback = (uuid: string) => {
+		if (!readyCallbacks.value[uuid]) return;
+
+		delete readyCallbacks.value[uuid];
 	};
 
 	subscribe("ready", async data => {
@@ -91,9 +105,11 @@ export const useWebsocketStore = defineStore("websocket", () => {
 
 		await userAuthStore.updatePermissions();
 
-		for await (const callback of readyCallbacks.value.values()) {
-			await callback().catch(() => {}); // TODO: Error handling
-		}
+		await Promise.all(
+			Object.values(readyCallbacks.value).map(
+				async callback => callback() // TODO: Error handling
+			)
+		);
 
 		await Promise.allSettled(
 			Object.keys(subscriptions.value)
@@ -133,11 +149,11 @@ export const useWebsocketStore = defineStore("websocket", () => {
 
 			if (!subscriptions.value[name]) return;
 
-			for await (const subscription of subscriptions.value[
-				name
-			].values()) {
-				await subscription(...data);
-			}
+			await Promise.all(
+				Object.values(subscriptions.value[name]).map(
+					async subscription => subscription(...data) // TODO: Error handling
+				)
+			);
 		});
 
 		socket.value.addEventListener("close", () => {