websocket.ts 6.7 KB


  1. import { defineStore } from "pinia";
  2. import { ref } from "vue";
  3. import { generateUuid } from "@common/utils/generateUuid";
  4. import { useConfigStore } from "./config";
  5. import { useUserAuthStore } from "./userAuth";
  6. import ms from "@/ms";
  7. export const useWebsocketStore = defineStore("websocket", () => {
  8. const configStore = useConfigStore();
  9. const userAuthStore = useUserAuthStore();
  10. const socket = ref();
  11. const ready = ref(false);
  12. const readyCallbacks = ref({});
  13. const jobCallbacks = ref({});
  14. const pendingJobs = ref([]);
  15. const subscriptions = ref({});
  16. const socketChannels = ["ready", "error"];
  17. const runJob = async (job: string, payload?: any) =>
  18. new Promise((resolve, reject) => {
  19. const callbackRef = generateUuid();
  20. const message = JSON.stringify([
  21. job,
  22. payload ?? {},
  23. { callbackRef }
  24. ]);
  25. jobCallbacks.value[callbackRef] = { resolve, reject };
  26. if (ready.value && socket.value?.readyState === WebSocket.OPEN)
  27. socket.value.send(message);
  28. else pendingJobs.value.push(message);
  29. });
  30. const subscribe = async (
  31. channel: string,
  32. callback: (data?: any) => any
  33. ) => {
  34. if (!subscriptions.value[channel])
  35. subscriptions.value[channel] = {
  36. status: "subscribing",
  37. callbacks: {}
  38. };
  39. if (
  40. !socketChannels.includes(channel) &&
  41. subscriptions.value[channel].status !== "subscribed"
  42. ) {
  43. subscriptions.value[channel].status = "subscribing";
  44. await runJob("events.subscribe", { channel });
  45. }
  46. subscriptions.value[channel].status = "subscribed";
  47. const uuid = generateUuid();
  48. subscriptions.value[channel].callbacks[uuid] = callback;
  49. return uuid;
  50. };
  51. const subscribeMany = async (
  52. channels: Record<string, (data?: any) => any>
  53. ) => {
  54. const channelsToSubscribeTo = Object.keys(channels).filter(
  55. channel =>
  56. !socketChannels.includes(channel) &&
  57. subscriptions.value[channel]?.status !== "subscribed"
  58. );
  59. channelsToSubscribeTo.forEach(channel => {
  60. if (!subscriptions.value[channel])
  61. subscriptions.value[channel] = {
  62. status: "subscribing",
  63. callbacks: {}
  64. };
  65. subscriptions.value[channel].status = "subscribing";
  66. });
  67. await runJob("events.subscribeMany", {
  68. channels: channelsToSubscribeTo
  69. });
  70. channelsToSubscribeTo.forEach(channel => {
  71. subscriptions.value[channel].status = "subscribed";
  72. });
  73. return Object.fromEntries(
  74. Object.entries(channels).map(([channel, callback]) => {
  75. const uuid = generateUuid();
  76. subscriptions.value[channel].callbacks[uuid] = callback;
  77. return [uuid, { channel, callback }];
  78. })
  79. );
  80. };
  81. const unsubscribe = async (channel: string, uuid: string) => {
  82. if (
  83. socketChannels.includes(channel) ||
  84. !subscriptions.value[channel] ||
  85. !subscriptions.value[channel].callbacks[uuid]
  86. )
  87. return;
  88. if (
  89. subscriptions.value[channel].status === "subscribed" &&
  90. Object.keys(subscriptions.value[channel].callbacks).length <= 1
  91. )
  92. await runJob("events.unsubscribe", { channel });
  93. delete subscriptions.value[channel].callbacks[uuid];
  94. if (Object.keys(subscriptions.value[channel].callbacks).length === 0)
  95. delete subscriptions.value[channel];
  96. };
  97. const unsubscribeMany = async (channels: Record<string, string>) => {
  98. const channelsToUnsubscribeFrom = Object.values(channels).filter(
  99. channel =>
  100. !socketChannels.includes(channel) &&
  101. subscriptions.value[channel] &&
  102. subscriptions.value[channel].status === "subscribed" &&
  103. Object.keys(subscriptions.value[channel].callbacks).length <= 1
  104. );
  105. await runJob("events.unsubscribeMany", {
  106. channels: channelsToUnsubscribeFrom
  107. });
  108. channelsToUnsubscribeFrom.forEach(channel => {
  109. delete subscriptions.value[channel];
  110. });
  111. return Promise.all(
  112. Object.entries(channels).map(async ([uuid, channel]) => {
  113. if (
  114. !subscriptions.value[channel] ||
  115. !subscriptions.value[channel].callbacks[uuid]
  116. )
  117. return;
  118. delete subscriptions.value[channel].callbacks[uuid];
  119. if (
  120. Object.keys(subscriptions.value[channel].callbacks)
  121. .length === 0
  122. )
  123. delete subscriptions.value[channel];
  124. })
  125. );
  126. };
  127. const unsubscribeAll = async () => {
  128. await runJob("events.unsubscribeAll");
  129. subscriptions.value = {};
  130. };
  131. const onReady = async (callback: () => any) => {
  132. const uuid = generateUuid();
  133. readyCallbacks.value[uuid] = callback;
  134. if (ready.value) await callback();
  135. return uuid;
  136. };
  137. const removeReadyCallback = (uuid: string) => {
  138. if (!readyCallbacks.value[uuid]) return;
  139. delete readyCallbacks.value[uuid];
  140. };
  141. subscribe("ready", async data => {
  142. configStore.$patch(data.config);
  143. userAuthStore.currentUser = data.user;
  144. userAuthStore.gotData = true;
  145. if (userAuthStore.loggedIn) {
  146. userAuthStore.resetCookieExpiration();
  147. }
  148. if (configStore.experimental.media_session) ms.initialize();
  149. else ms.uninitialize();
  150. ready.value = true;
  151. await userAuthStore.updatePermissions();
  152. await Promise.all(
  153. Object.values(readyCallbacks.value).map(
  154. async callback => callback() // TODO: Error handling
  155. )
  156. );
  157. await Promise.allSettled(
  158. Object.keys(subscriptions.value)
  159. .filter(channel => !socketChannels.includes(channel))
  160. .map(channel => runJob("events.subscribe", { channel }))
  161. );
  162. pendingJobs.value.forEach(message => socket.value.send(message));
  163. pendingJobs.value = [];
  164. });
  165. const onMessage = async message => {
  166. const data = JSON.parse(message.data);
  167. const name = data.shift(0);
  168. if (name === "jobCallback") {
  169. const callbackRef = data.shift(0);
  170. const response = data.shift(0);
  171. if (response?.status === "success")
  172. jobCallbacks.value[callbackRef]?.resolve(response?.data);
  173. else jobCallbacks.value[callbackRef]?.reject(response);
  174. delete jobCallbacks.value[callbackRef];
  175. return;
  176. }
  177. if (!subscriptions.value[name]) return;
  178. await Promise.all(
  179. Object.values(subscriptions.value[name].callbacks).map(
  180. async subscription => subscription(...data) // TODO: Error handling
  181. )
  182. );
  183. };
  184. const onClose = () => {
  185. ready.value = false;
  186. // try to reconnect every 1000ms, if the user isn't banned
  187. // eslint-disable-next-line no-use-before-define
  188. if (!userAuthStore.banned) setTimeout(init, 1000);
  189. };
  190. const init = () => {
  191. if (
  192. [WebSocket.CONNECTING, WebSocket.OPEN].includes(
  193. socket.value?.readyState
  194. )
  195. ) {
  196. socket.value.close();
  197. socket.value.removeEventListener("message", onMessage);
  198. socket.value.removeEventListener("close", onClose);
  199. }
  200. socket.value = new WebSocket(`${configStore.urls.ws}?rewrite=1`);
  201. socket.value.addEventListener("message", onMessage);
  202. socket.value.addEventListener("close", onClose);
  203. };
  204. init();
  205. return {
  206. socket,
  207. ready,
  208. readyCallbacks,
  209. jobCallbacks,
  210. pendingJobs,
  211. subscriptions,
  212. runJob,
  213. subscribe,
  214. subscribeMany,
  215. unsubscribe,
  216. unsubscribeMany,
  217. unsubscribeAll,
  218. onReady,
  219. removeReadyCallback
  220. };
  221. });