ModelFetcher.ts 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import { DeferredPromise } from "@common/DeferredPromise";
  2. import { forEachIn } from "@common/utils/forEachIn";
  3. import { useWebsocketStore } from "@/stores/websocket";
  4. export interface ModelFetcherRequest {
  5. promise: DeferredPromise;
  6. payload: {
  7. modelName: string;
  8. modelIds: string[];
  9. };
  10. }
  11. // TODO combine the ModelFetcher and the ModelPermissionFetcher
  12. /**
  13. * Class used for fetching models in bulk, every 25ms max, per model type
  14. * So if we tried to fetch 100 different minifiedUser models separately, it would do only 1 request to fetch the models, not 100 separate ones
  15. */
  16. export class ModelFetcher {
  17. private static requestsQueued: ModelFetcherRequest[] = [];
  18. private static timeoutActive = false;
  19. private static responseCache = {};
  20. private static fetch() {
  21. // If there is no other timeout running, indicate we will run one. Otherwise, return, as a timeout is already running
  22. if (!this.timeoutActive) this.timeoutActive = true;
  23. else return;
  24. setTimeout(() => {
  25. // Reset timeout active, so another one can run
  26. this.timeoutActive = false;
  27. // Make a copy of all requests currently queued, and then take those requests out of the queue so we can request them
  28. const requests = this.requestsQueued;
  29. this.requestsQueued = [];
  30. // Splits the requests per model
  31. const requestsPerModel = {};
  32. requests.forEach(request => {
  33. const { modelName } = request.payload;
  34. if (!Array.isArray(requestsPerModel[modelName]))
  35. requestsPerModel[modelName] = [];
  36. requestsPerModel[modelName].push(request);
  37. });
  38. const modelNames = Object.keys(requestsPerModel);
  39. const { runJob } = useWebsocketStore();
  40. // TODO somehow make the following forEachIn run at the same time for all modelNames
  41. // Runs the requests per model
  42. forEachIn(modelNames, async modelName => {
  43. // All already cached model ids
  44. let cachedModelIds = Object.keys(this.responseCache[modelName]);
  45. // Gets a unique list of all model ids for the current model that we want to request permissions for, that are not already cached
  46. const modelIds = Array.from(
  47. new Set(
  48. requestsPerModel[modelName].flatMap(
  49. request => request.payload.modelIds
  50. )
  51. )
  52. ).filter(
  53. (modelId: string) => !cachedModelIds.includes(modelId)
  54. );
  55. // Only do a request if more than one model isn't already cached
  56. if (modelIds.length > 0) {
  57. console.log(`Requesting model ids`, modelName, modelIds);
  58. let errorCaught = false;
  59. const result = (await runJob(
  60. `data.${modelName}.findManyById`,
  61. {
  62. _ids: modelIds
  63. }
  64. ).catch(err => {
  65. errorCaught = true;
  66. const requests = requestsPerModel[modelName];
  67. // For all requests, reject the deferred promise with the returned error.
  68. // TODO in the future, we want to handle mixed cases where one or more model was rejected, and one or more was returned
  69. requests.forEach(request => {
  70. const { promise } = request;
  71. promise.reject(err);
  72. });
  73. })) as any[];
  74. if (errorCaught) return;
  75. // Cache the responses for the requested model ids
  76. modelIds.forEach(modelId => {
  77. const model = result.find(
  78. model => model._id === modelId
  79. );
  80. console.log(`Caching ${modelName}.${modelId}`, model);
  81. this.responseCache[modelName][modelId] = model;
  82. });
  83. }
  84. const requests = requestsPerModel[modelName];
  85. // For all requests, resolve the deferred promise with the returned model(s) that was requested
  86. requests.forEach(request => {
  87. const { payload, promise } = request;
  88. const { modelIds } = payload;
  89. const models = modelIds
  90. .map(modelId => this.responseCache[modelName][modelId])
  91. .filter(model => model);
  92. // TODO add errors here for models that returned an error, or if the entire job returned an error
  93. promise.resolve(models);
  94. });
  95. // A unique list of model ids that are will be requested in the next batch for the current model type
  96. const queuedModelIds = Array.from(
  97. new Set(
  98. this.requestsQueued
  99. .filter(request => request.payload.modelName)
  100. .flatMap(request => request.payload.modelIds)
  101. )
  102. );
  103. // A list of model ids responses currently cached
  104. cachedModelIds = Object.keys(this.responseCache[modelName]);
  105. // A list of the cached model responses that can safely be deleted, because no queued up request needs it
  106. const cachedModelIdsToDelete = cachedModelIds.filter(
  107. cachedModelId => !queuedModelIds.includes(cachedModelId)
  108. );
  109. console.log(`Queued model ids`, modelName, queuedModelIds);
  110. console.log(`Cached model ids`, modelName, cachedModelIds);
  111. console.log(
  112. `Cached model ids to delete`,
  113. modelName,
  114. cachedModelIdsToDelete
  115. );
  116. // TODO In theory, we could check if any of the queued requests can be resolved here. Not worth it at the moment.
  117. cachedModelIdsToDelete.forEach(cachedModelIdToDelete => {
  118. delete this.responseCache[modelName][cachedModelIdToDelete];
  119. });
  120. });
  121. }, 25);
  122. }
  123. public static fetchModelsByIds(modelName: string, modelIds: string[]) {
  124. this.responseCache[modelName] ??= {};
  125. return new Promise((resolve, reject) => {
  126. const promise = new DeferredPromise();
  127. // Listens for the deferred promise response, before we actually push and fetch
  128. promise.promise.then(result => {
  129. resolve(result);
  130. });
  131. promise.promise.catch(err => {
  132. // TODO in the future, we want to handle these cases differently, returning error per-model or if the entire function failed
  133. reject(err);
  134. });
  135. // Pushes the request to the queue
  136. this.requestsQueued.push({
  137. payload: {
  138. modelName,
  139. modelIds
  140. },
  141. promise
  142. });
  143. // Calls the fetch function, which will start a timeout if one isn't already running, which will actually request the model
  144. this.fetch();
  145. });
  146. }
  147. }