websocket.ts 6.6 KB


  1. import { defineStore } from "pinia";
  2. import { ref } from "vue";
  3. import { useConfigStore } from "./config";
  4. import { useUserAuthStore } from "./userAuth";
  5. import utils from "@/utils";
  6. import ms from "@/ms";
  7. export const useWebsocketStore = defineStore("websocket", () => {
  8. const configStore = useConfigStore();
  9. const userAuthStore = useUserAuthStore();
  10. const socket = ref();
  11. const ready = ref(false);
  12. const readyCallbacks = ref({});
  13. const jobCallbacks = ref({});
  14. const pendingJobs = ref([]);
  15. const subscriptions = ref({});
  16. const socketChannels = ["ready", "error"];
  17. const runJob = async (job: string, payload?: any) =>
  18. new Promise((resolve, reject) => {
  19. const callbackRef = utils.guid();
  20. const message = JSON.stringify([
  21. job,
  22. payload ?? {},
  23. { callbackRef }
  24. ]);
  25. jobCallbacks.value[callbackRef] = { resolve, reject };
  26. if (ready.value && socket.value?.readyState === WebSocket.OPEN)
  27. socket.value.send(message);
  28. else pendingJobs.value.push(message);
  29. });
  30. const subscribe = async (
  31. channel: string,
  32. callback: (data?: any) => any
  33. ) => {
  34. if (!subscriptions.value[channel])
  35. subscriptions.value[channel] = {
  36. status: "subscribing",
  37. callbacks: {}
  38. };
  39. if (!socketChannels.includes(channel)) {
  40. subscriptions.value[channel].status = "subscribing";
  41. await runJob("events.subscribe", { channel });
  42. }
  43. subscriptions.value[channel].status = "subscribed";
  44. const uuid = utils.guid();
  45. subscriptions.value[channel].callbacks[uuid] = callback;
  46. return uuid;
  47. };
  48. const subscribeMany = async (
  49. channels: Record<string, (data?: any) => any>
  50. ) => {
  51. const channelsToSubscribeTo = Object.keys(channels).filter(
  52. channel => !socketChannels.includes(channel)
  53. );
  54. channelsToSubscribeTo.forEach(channel => {
  55. if (!subscriptions.value[channel])
  56. subscriptions.value[channel] = {
  57. status: "subscribing",
  58. callbacks: {}
  59. };
  60. subscriptions.value[channel].status = "subscribing";
  61. });
  62. await runJob("events.subscribeMany", {
  63. channels: channelsToSubscribeTo
  64. });
  65. channelsToSubscribeTo.forEach(channel => {
  66. subscriptions.value[channel].status = "subscribed";
  67. });
  68. return Object.fromEntries(
  69. Object.entries(channels).map(([channel, callback]) => {
  70. const uuid = utils.guid();
  71. subscriptions.value[channel].callbacks[uuid] = callback;
  72. return [uuid, { channel, callback }];
  73. })
  74. );
  75. };
  76. const unsubscribe = async (channel: string, uuid: string) => {
  77. if (
  78. socketChannels.includes(channel) ||
  79. !subscriptions.value[channel] ||
  80. !subscriptions.value[channel].callbacks[uuid]
  81. )
  82. return;
  83. if (
  84. subscriptions.value[channel].status === "subscribed" &&
  85. Object.keys(subscriptions.value[channel].callbacks).length <= 1
  86. )
  87. await runJob("events.unsubscribe", { channel });
  88. delete subscriptions.value[channel].callbacks[uuid];
  89. if (Object.keys(subscriptions.value[channel].callbacks).length === 0)
  90. delete subscriptions.value[channel];
  91. };
  92. const unsubscribeMany = async (channels: Record<string, string>) => {
  93. const channelsToUnsubscribeFrom = Object.values(channels).filter(
  94. channel =>
  95. !socketChannels.includes(channel) &&
  96. subscriptions.value[channel] &&
  97. subscriptions.value[channel].status === "subscribed" &&
  98. Object.keys(subscriptions.value[channel].callbacks).length <= 1
  99. );
  100. await runJob("events.unsubscribeMany", {
  101. channels: channelsToUnsubscribeFrom
  102. });
  103. channelsToUnsubscribeFrom.forEach(channel => {
  104. delete subscriptions.value[channel];
  105. });
  106. return Promise.all(
  107. Object.entries(channels).map(async ([uuid, channel]) => {
  108. if (
  109. !subscriptions.value[channel] ||
  110. !subscriptions.value[channel].callbacks[uuid]
  111. )
  112. return;
  113. delete subscriptions.value[channel].callbacks[uuid];
  114. if (
  115. Object.keys(subscriptions.value[channel].callbacks)
  116. .length === 0
  117. )
  118. delete subscriptions.value[channel];
  119. })
  120. );
  121. };
  122. const unsubscribeAll = async () => {
  123. await runJob("events.unsubscribeAll");
  124. subscriptions.value = {};
  125. };
  126. const onReady = async (callback: () => any) => {
  127. const uuid = utils.guid();
  128. readyCallbacks.value[uuid] = callback;
  129. if (ready.value) await callback();
  130. return uuid;
  131. };
  132. const removeReadyCallback = (uuid: string) => {
  133. if (!readyCallbacks.value[uuid]) return;
  134. delete readyCallbacks.value[uuid];
  135. };
  136. subscribe("ready", async data => {
  137. configStore.$patch(data.config);
  138. userAuthStore.currentUser = data.user;
  139. userAuthStore.gotData = true;
  140. if (userAuthStore.loggedIn) {
  141. userAuthStore.resetCookieExpiration();
  142. }
  143. if (configStore.experimental.media_session) ms.initialize();
  144. else ms.uninitialize();
  145. ready.value = true;
  146. await userAuthStore.updatePermissions();
  147. await Promise.all(
  148. Object.values(readyCallbacks.value).map(
  149. async callback => callback() // TODO: Error handling
  150. )
  151. );
  152. await Promise.allSettled(
  153. Object.keys(subscriptions.value)
  154. .filter(channel => !socketChannels.includes(channel))
  155. .map(channel => runJob("events.subscribe", { channel }))
  156. );
  157. pendingJobs.value.forEach(message => socket.value.send(message));
  158. pendingJobs.value = [];
  159. });
  160. const onMessage = async message => {
  161. const data = JSON.parse(message.data);
  162. const name = data.shift(0);
  163. if (name === "jobCallback") {
  164. const callbackRef = data.shift(0);
  165. const response = data.shift(0);
  166. if (response?.status === "success")
  167. jobCallbacks.value[callbackRef]?.resolve(response?.data);
  168. else jobCallbacks.value[callbackRef]?.reject(response);
  169. delete jobCallbacks.value[callbackRef];
  170. return;
  171. }
  172. if (!subscriptions.value[name]) return;
  173. await Promise.all(
  174. Object.values(subscriptions.value[name].callbacks).map(
  175. async subscription => subscription(...data) // TODO: Error handling
  176. )
  177. );
  178. };
  179. const onClose = () => {
  180. ready.value = false;
  181. // try to reconnect every 1000ms, if the user isn't banned
  182. // eslint-disable-next-line no-use-before-define
  183. if (!userAuthStore.banned) setTimeout(init, 1000);
  184. };
  185. const init = () => {
  186. if (
  187. [WebSocket.CONNECTING, WebSocket.OPEN].includes(
  188. socket.value?.readyState
  189. )
  190. ) {
  191. socket.value.close();
  192. socket.value.removeEventListener("message", onMessage);
  193. socket.value.removeEventListener("close", onClose);
  194. }
  195. socket.value = new WebSocket(`${configStore.urls.ws}?rewrite=1`);
  196. socket.value.addEventListener("message", onMessage);
  197. socket.value.addEventListener("close", onClose);
  198. };
  199. init();
  200. return {
  201. socket,
  202. ready,
  203. readyCallbacks,
  204. jobCallbacks,
  205. pendingJobs,
  206. subscriptions,
  207. runJob,
  208. subscribe,
  209. subscribeMany,
  210. unsubscribe,
  211. unsubscribeMany,
  212. unsubscribeAll,
  213. onReady,
  214. removeReadyCallback
  215. };
  216. });