Browse Source

fix: Frontend subscriptions not persisting reconnect

Owen Diffey 1 year ago
parent
commit
7d2438d5a5
1 changed files with 45 additions and 43 deletions
  1. 45 43
      frontend/src/stores/websocket.ts

+ 45 - 43
frontend/src/stores/websocket.ts

@@ -16,10 +16,7 @@ export const useWebsocketStore = defineStore("websocket", () => {
 	const pendingJobs = ref([]);
 	const subscriptions = ref({});
 
-	const socketChannels = {
-		ready: "subscribed",
-		error: "subscribed"
-	};
+	const socketChannels = ["ready", "error"];
 
 	const runJob = async (job: string, payload?: any) =>
 		new Promise((resolve, reject) => {
@@ -41,20 +38,22 @@ export const useWebsocketStore = defineStore("websocket", () => {
 		channel: string,
 		callback: (data?: any) => any
 	) => {
-		if (
-			["subscribed", "subscribing"].indexOf(socketChannels[channel]) ===
-			-1
-		) {
-			socketChannels[channel] = "subscribing";
+		if (!subscriptions.value[channel])
+			subscriptions.value[channel] = {
+				status: "subscribing",
+				callbacks: {}
+			};
+
+		if (!socketChannels.includes(channel)) {
+			subscriptions.value[channel].status = "subscribing";
 			await runJob("events.subscribe", { channel });
-			socketChannels[channel] = "subscribed";
 		}
 
-		if (!subscriptions.value[channel]) subscriptions.value[channel] = {};
+		subscriptions.value[channel].status = "subscribed";
 
 		const uuid = utils.guid();
 
-		subscriptions.value[channel][uuid] = callback;
+		subscriptions.value[channel].callbacks[uuid] = callback;
 
 		return uuid;
 	};
@@ -63,29 +62,32 @@ export const useWebsocketStore = defineStore("websocket", () => {
 		channels: Record<string, (data?: any) => any>
 	) => {
 		const channelsToSubscribeTo = Object.keys(channels).filter(
-			channel =>
-				["subscribed", "subscribing"].indexOf(
-					socketChannels[channel]
-				) === -1
+			channel => !socketChannels.includes(channel)
 		);
+
 		channelsToSubscribeTo.forEach(channel => {
-			socketChannels[channel] = "subscribing";
+			if (!subscriptions.value[channel])
+				subscriptions.value[channel] = {
+					status: "subscribing",
+					callbacks: {}
+				};
+
+			subscriptions.value[channel].status = "subscribing";
 		});
+
 		await runJob("events.subscribeMany", {
 			channels: channelsToSubscribeTo
 		});
+
 		channelsToSubscribeTo.forEach(channel => {
-			socketChannels[channel] = "subscribed";
+			subscriptions.value[channel].status = "subscribed";
 		});
 
 		return Object.fromEntries(
 			Object.entries(channels).map(([channel, callback]) => {
-				if (!subscriptions.value[channel])
-					subscriptions.value[channel] = {};
-
 				const uuid = utils.guid();
 
-				subscriptions.value[channel][uuid] = callback;
+				subscriptions.value[channel].callbacks[uuid] = callback;
 
 				return [uuid, { channel, callback }];
 			})
@@ -94,47 +96,55 @@ export const useWebsocketStore = defineStore("websocket", () => {
 
 	const unsubscribe = async (channel: string, uuid: string) => {
 		if (
+			socketChannels.includes(channel) ||
 			!subscriptions.value[channel] ||
-			!subscriptions.value[channel][uuid]
+			!subscriptions.value[channel].callbacks[uuid]
 		)
 			return;
 
 		if (
-			socketChannels[channel] === "subscribed" &&
-			Object.keys(subscriptions.value[channel]).length <= 1
+			subscriptions.value[channel].status === "subscribed" &&
+			Object.keys(subscriptions.value[channel].callbacks).length <= 1
 		)
 			await runJob("events.unsubscribe", { channel });
 
-		delete subscriptions.value[channel][uuid];
+		delete subscriptions.value[channel].callbacks[uuid];
 
-		if (Object.keys(subscriptions.value[channel]).length === 0)
+		if (Object.keys(subscriptions.value[channel].callbacks).length === 0)
 			delete subscriptions.value[channel];
 	};
 
 	const unsubscribeMany = async (channels: Record<string, string>) => {
 		const channelsToUnsubscribeFrom = Object.values(channels).filter(
 			channel =>
-				socketChannels[channel] === "subscribed" &&
-				Object.keys(subscriptions.value[channel]).length <= 1
+				!socketChannels.includes(channel) &&
+				subscriptions.value[channel] &&
+				subscriptions.value[channel].status === "subscribed" &&
+				Object.keys(subscriptions.value[channel].callbacks).length <= 1
 		);
+
 		await runJob("events.unsubscribeMany", {
 			channels: channelsToUnsubscribeFrom
 		});
+
 		channelsToUnsubscribeFrom.forEach(channel => {
-			delete socketChannels[channel];
+			delete subscriptions.value[channel];
 		});
 
 		return Promise.all(
 			Object.entries(channels).map(async ([uuid, channel]) => {
 				if (
 					!subscriptions.value[channel] ||
-					!subscriptions.value[channel][uuid]
+					!subscriptions.value[channel].callbacks[uuid]
 				)
 					return;
 
-				delete subscriptions.value[channel][uuid];
+				delete subscriptions.value[channel].callbacks[uuid];
 
-				if (Object.keys(subscriptions.value[channel]).length === 0)
+				if (
+					Object.keys(subscriptions.value[channel].callbacks)
+						.length === 0
+				)
 					delete subscriptions.value[channel];
 			})
 		);
@@ -144,9 +154,6 @@ export const useWebsocketStore = defineStore("websocket", () => {
 		await runJob("events.unsubscribeAll");
 
 		subscriptions.value = {};
-		Object.keys(socketChannels).forEach(channel => {
-			delete socketChannels[channel];
-		});
 	};
 
 	const onReady = async (callback: () => any) => {
@@ -190,12 +197,7 @@ export const useWebsocketStore = defineStore("websocket", () => {
 
 		await Promise.allSettled(
 			Object.keys(subscriptions.value)
-				.filter(
-					channel =>
-						["subscribing", "subscribed"].indexOf(
-							socketChannels[channel]
-						) === -1
-				)
+				.filter(channel => !socketChannels.includes(channel))
 				.map(channel => runJob("events.subscribe", { channel }))
 		);
 
@@ -234,7 +236,7 @@ export const useWebsocketStore = defineStore("websocket", () => {
 			if (!subscriptions.value[name]) return;
 
 			await Promise.all(
-				Object.values(subscriptions.value[name]).map(
+				Object.values(subscriptions.value[name].callbacks).map(
 					async subscription => subscription(...data) // TODO: Error handling
 				)
 			);