websocket.ts 6.7 KB


  1. import { defineStore } from "pinia";
  2. import { ref } from "vue";
  3. import { generateUuid } from "@common/utils/generateUuid";
  4. import { forEachIn } from "@common/utils/forEachIn";
  5. import { useConfigStore } from "./config";
  6. import { useUserAuthStore } from "./userAuth";
  7. import ms from "@/ms";
  8. export const useWebsocketStore = defineStore("websocket", () => {
  9. const configStore = useConfigStore();
  10. const userAuthStore = useUserAuthStore();
  11. const socket = ref();
  12. const ready = ref(false);
  13. const readyCallbacks = ref({});
  14. const jobCallbacks = ref({});
  15. const pendingJobs = ref([]);
  16. const subscriptions = ref({});
  17. const socketChannels = ["ready", "error"];
  18. const runJob = async (job: string, payload?: any) =>
  19. new Promise((resolve, reject) => {
  20. const callbackRef = generateUuid();
  21. const message = JSON.stringify([
  22. job,
  23. payload ?? {},
  24. { callbackRef }
  25. ]);
  26. jobCallbacks.value[callbackRef] = { resolve, reject };
  27. if (ready.value && socket.value?.readyState === WebSocket.OPEN)
  28. socket.value.send(message);
  29. else pendingJobs.value.push(message);
  30. });
  31. const subscribe = async (
  32. channel: string,
  33. callback: (data?: any) => any
  34. ) => {
  35. if (!subscriptions.value[channel])
  36. subscriptions.value[channel] = {
  37. status: "subscribing",
  38. callbacks: {}
  39. };
  40. if (
  41. !socketChannels.includes(channel) &&
  42. subscriptions.value[channel].status !== "subscribed"
  43. ) {
  44. subscriptions.value[channel].status = "subscribing";
  45. await runJob("events.subscribe", { channel });
  46. }
  47. subscriptions.value[channel].status = "subscribed";
  48. const uuid = generateUuid();
  49. subscriptions.value[channel].callbacks[uuid] = callback;
  50. return uuid;
  51. };
  52. const subscribeMany = async (
  53. channels: Record<string, (data?: any) => any>
  54. ) => {
  55. const channelsToSubscribeTo = Object.keys(channels).filter(
  56. channel =>
  57. !socketChannels.includes(channel) &&
  58. subscriptions.value[channel]?.status !== "subscribed"
  59. );
  60. channelsToSubscribeTo.forEach(channel => {
  61. if (!subscriptions.value[channel])
  62. subscriptions.value[channel] = {
  63. status: "subscribing",
  64. callbacks: {}
  65. };
  66. subscriptions.value[channel].status = "subscribing";
  67. });
  68. await runJob("events.subscribeMany", {
  69. channels: channelsToSubscribeTo
  70. });
  71. channelsToSubscribeTo.forEach(channel => {
  72. subscriptions.value[channel].status = "subscribed";
  73. });
  74. return Object.fromEntries(
  75. Object.entries(channels).map(([channel, callback]) => {
  76. const uuid = generateUuid();
  77. subscriptions.value[channel].callbacks[uuid] = callback;
  78. return [uuid, { channel, callback }];
  79. })
  80. );
  81. };
  82. const unsubscribe = async (channel: string, uuid: string) => {
  83. if (
  84. socketChannels.includes(channel) ||
  85. !subscriptions.value[channel] ||
  86. !subscriptions.value[channel].callbacks[uuid]
  87. )
  88. return;
  89. if (
  90. subscriptions.value[channel].status === "subscribed" &&
  91. Object.keys(subscriptions.value[channel].callbacks).length <= 1
  92. )
  93. await runJob("events.unsubscribe", { channel });
  94. delete subscriptions.value[channel].callbacks[uuid];
  95. if (Object.keys(subscriptions.value[channel].callbacks).length === 0)
  96. delete subscriptions.value[channel];
  97. };
  98. const unsubscribeMany = async (channels: Record<string, string>) => {
  99. const channelsToUnsubscribeFrom = Object.values(channels).filter(
  100. channel =>
  101. !socketChannels.includes(channel) &&
  102. subscriptions.value[channel] &&
  103. subscriptions.value[channel].status === "subscribed" &&
  104. Object.keys(subscriptions.value[channel].callbacks).length <= 1
  105. );
  106. await runJob("events.unsubscribeMany", {
  107. channels: channelsToUnsubscribeFrom
  108. });
  109. channelsToUnsubscribeFrom.forEach(channel => {
  110. delete subscriptions.value[channel];
  111. });
  112. return forEachIn(Object.entries(channels), async ([uuid, channel]) => {
  113. if (
  114. !subscriptions.value[channel] ||
  115. !subscriptions.value[channel].callbacks[uuid]
  116. )
  117. return;
  118. delete subscriptions.value[channel].callbacks[uuid];
  119. if (
  120. Object.keys(subscriptions.value[channel].callbacks).length === 0
  121. )
  122. delete subscriptions.value[channel];
  123. });
  124. };
  125. const unsubscribeAll = async () => {
  126. await runJob("events.unsubscribeAll");
  127. subscriptions.value = {};
  128. };
  129. const onReady = async (callback: () => any) => {
  130. const uuid = generateUuid();
  131. readyCallbacks.value[uuid] = callback;
  132. if (ready.value) await callback();
  133. return uuid;
  134. };
  135. const removeReadyCallback = (uuid: string) => {
  136. if (!readyCallbacks.value[uuid]) return;
  137. delete readyCallbacks.value[uuid];
  138. };
  139. subscribe("ready", async data => {
  140. configStore.$patch(data.config);
  141. userAuthStore.currentUser = data.user;
  142. userAuthStore.gotData = true;
  143. if (userAuthStore.loggedIn) {
  144. userAuthStore.resetCookieExpiration();
  145. }
  146. if (configStore.experimental.media_session) ms.initialize();
  147. else ms.uninitialize();
  148. ready.value = true;
  149. await userAuthStore.updatePermissions();
  150. await forEachIn(
  151. Object.values(readyCallbacks.value),
  152. async callback => callback() // TODO: Error handling
  153. );
  154. await forEachIn(
  155. Object.keys(subscriptions.value).filter(
  156. channel => !socketChannels.includes(channel)
  157. ),
  158. channel => runJob("events.subscribe", { channel }),
  159. { onError: Promise.resolve }
  160. );
  161. pendingJobs.value.forEach(message => socket.value.send(message));
  162. pendingJobs.value = [];
  163. });
  164. const onMessage = async message => {
  165. const data = JSON.parse(message.data);
  166. const name = data.shift(0);
  167. if (name === "jobCallback") {
  168. const callbackRef = data.shift(0);
  169. const response = data.shift(0);
  170. if (response?.status === "success")
  171. jobCallbacks.value[callbackRef]?.resolve(response?.data);
  172. else jobCallbacks.value[callbackRef]?.reject(response);
  173. delete jobCallbacks.value[callbackRef];
  174. return;
  175. }
  176. if (!subscriptions.value[name]) return;
  177. await forEachIn(
  178. Object.values(subscriptions.value[name].callbacks),
  179. async subscription => subscription(...data) // TODO: Error handling
  180. );
  181. };
  182. const onClose = () => {
  183. ready.value = false;
  184. // try to reconnect every 1000ms, if the user isn't banned
  185. // eslint-disable-next-line no-use-before-define
  186. if (!userAuthStore.banned) setTimeout(init, 1000);
  187. };
  188. const init = () => {
  189. if (
  190. [WebSocket.CONNECTING, WebSocket.OPEN].includes(
  191. socket.value?.readyState
  192. )
  193. ) {
  194. socket.value.close();
  195. socket.value.removeEventListener("message", onMessage);
  196. socket.value.removeEventListener("close", onClose);
  197. }
  198. socket.value = new WebSocket(`${configStore.urls.ws}?rewrite=1`);
  199. socket.value.addEventListener("message", onMessage);
  200. socket.value.addEventListener("close", onClose);
  201. };
  202. init();
  203. return {
  204. socket,
  205. ready,
  206. readyCallbacks,
  207. jobCallbacks,
  208. pendingJobs,
  209. subscriptions,
  210. runJob,
  211. subscribe,
  212. subscribeMany,
  213. unsubscribe,
  214. unsubscribeMany,
  215. unsubscribeAll,
  216. onReady,
  217. removeReadyCallback
  218. };
  219. });