JobQueue.ts 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import BaseModule from "@/BaseModule";
  2. import Job, { JobStatus } from "@/Job";
  3. import { JobOptions } from "@/types/JobOptions";
  4. import { Jobs, Modules } from "@/types/Modules";
  5. export class JobQueue {
  6. private _concurrency: number;
  7. private _isPaused: boolean;
  8. private _jobs: Job[];
  9. private _queue: Job[];
  10. private _active: Job[];
  11. private _processLock: boolean;
  12. private _callbacks: Record<
  13. string,
  14. {
  15. resolve: (value: any) => void;
  16. reject: (reason?: any) => void;
  17. }
  18. >;
  19. /**
  20. * Job Queue
  21. */
  22. public constructor() {
  23. this._concurrency = 10000;
  24. this._isPaused = true;
  25. this._jobs = [];
  26. this._queue = [];
  27. this._active = [];
  28. this._callbacks = {};
  29. this._processLock = false;
  30. }
  31. /**
  32. * getJob - Fetch job
  33. *
  34. * @param jobId - Job UUID
  35. * @returns Job if found
  36. */
  37. public getJob(jobId: string) {
  38. return this._jobs.find(job => job.getUuid() === jobId);
  39. }
  40. /**
  41. * pause - Pause queue
  42. *
  43. * Pause processing of jobs in queue, active jobs will not be paused.
  44. */
  45. public pause() {
  46. this._isPaused = true;
  47. }
  48. /**
  49. * resume - Resume queue
  50. */
  51. public resume() {
  52. this._isPaused = false;
  53. this._process();
  54. }
  55. /**
  56. * runJob - Run a job
  57. *
  58. * @param moduleName - Module name
  59. * @param jobName - Job name
  60. * @param params - Params
  61. */
  62. public async runJob<
  63. ModuleNameType extends keyof Jobs & keyof Modules,
  64. JobNameType extends keyof Jobs[ModuleNameType] &
  65. keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
  66. PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
  67. ? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
  68. ? Record<string, never>
  69. : Jobs[ModuleNameType][JobNameType]["payload"]
  70. : Record<string, never>,
  71. ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
  72. ? Jobs[ModuleNameType][JobNameType]["returns"]
  73. : never
  74. >(
  75. moduleName: ModuleNameType,
  76. jobName: JobNameType,
  77. payload: PayloadType,
  78. options?: JobOptions
  79. ): Promise<ReturnType> {
  80. return new Promise<ReturnType>((resolve, reject) => {
  81. this.queueJob(
  82. moduleName,
  83. jobName,
  84. payload,
  85. { resolve, reject },
  86. options
  87. ).catch(reject);
  88. });
  89. }
  90. /**
  91. * queueJob - Queue a job
  92. *
  93. * @param moduleName - Module name
  94. * @param jobName - Job name
  95. * @param params - Params
  96. */
  97. public async queueJob<
  98. ModuleNameType extends keyof Jobs & keyof Modules,
  99. JobNameType extends keyof Jobs[ModuleNameType] &
  100. keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
  101. PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
  102. ? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
  103. ? Record<string, never>
  104. : Jobs[ModuleNameType][JobNameType]["payload"]
  105. : Record<string, never>,
  106. ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
  107. ? Jobs[ModuleNameType][JobNameType]["returns"]
  108. : never
  109. >(
  110. moduleName: ModuleNameType,
  111. jobName: JobNameType,
  112. payload: PayloadType,
  113. callback: {
  114. resolve: (value: ReturnType) => void;
  115. reject: (reason?: any) => void;
  116. },
  117. options?: JobOptions
  118. ): Promise<string> {
  119. const job = new Job(jobName.toString(), moduleName, payload, options);
  120. this._callbacks[job.getUuid()] = callback;
  121. this._jobs.push(job);
  122. this._queue.push(job);
  123. this._process();
  124. return job.getUuid();
  125. }
  126. /**
  127. * process - Process queue
  128. */
  129. private async _process() {
  130. // If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
  131. if (this._processLock) return;
  132. // If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
  133. if (
  134. this._isPaused ||
  135. this._active.length >= this._concurrency ||
  136. this._queue.length === 0
  137. )
  138. return;
  139. // Lock the process function
  140. this._processLock = true;
  141. // Sort jobs based on priority, with a lower priority being preferred
  142. const jobs = this._queue.sort(
  143. (a, b) => a.getPriority() - b.getPriority()
  144. );
  145. // Loop through all jobs
  146. for (let i = 0; i < jobs.length; i += 1) {
  147. const job = jobs[i];
  148. // If the module of the job is not started, we can't run the job, so go to the next job in the queue
  149. // eslint-disable-next-line no-continue
  150. if (!job.getModule().canRunJobs()) continue;
  151. // Remove the job from the queue and add it to the active jobs array
  152. this._queue.splice(this._queue.indexOf(job), 1);
  153. // Execute the job
  154. this._active.push(job);
  155. const callback = this._callbacks[job.getUuid()];
  156. job.execute()
  157. .then(callback.resolve)
  158. .catch(callback.reject)
  159. .finally(() => {
  160. delete this._callbacks[job.getUuid()];
  161. // If the current job is in the active jobs array, remove it, and then run the process function to run another job
  162. const activeJobIndex = this._active.indexOf(job);
  163. if (activeJobIndex > -1) {
  164. this._active.splice(activeJobIndex, 1);
  165. }
  166. this._process();
  167. });
  168. // Stop the for loop
  169. if (this._active.length >= this._concurrency) break;
  170. }
  171. // Unlock the process after the for loop is finished, so it can be run again
  172. this._processLock = false;
  173. }
  174. /**
  175. * getStatus - Get status of job queue
  176. *
  177. * @returns Job queue status
  178. */
  179. public getStatus() {
  180. return {
  181. isPaused: this._isPaused,
  182. queueLength: this._queue.length,
  183. activeLength: this._active.length,
  184. concurrency: this._concurrency
  185. };
  186. }
  187. /**
  188. * getQueueStatus - Get statistics of queued or active jobs
  189. *
  190. * @param type - Job type filter
  191. * @returns Job queue statistics
  192. */
  193. public getQueueStatus(type?: JobStatus) {
  194. const status: Record<string, ReturnType<Job["toJSON"]>[]> = {};
  195. if (!type || type === JobStatus.ACTIVE)
  196. status.active = this._active.map(job => job.toJSON());
  197. if (!type || type === JobStatus.QUEUED)
  198. status.queue = this._queue.map(job => job.toJSON());
  199. return status;
  200. }
  201. /**
  202. * Gets the job array
  203. *
  204. */
  205. public getJobs() {
  206. return this._jobs;
  207. }
  208. }
  209. export default new JobQueue();