JobQueue.ts 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import BaseModule, { ModuleStatus } from "./BaseModule";
  2. import Job, { JobStatus } from "./Job";
  3. import { JobOptions } from "./types/JobOptions";
  4. import { Jobs, Modules } from "./types/Modules";
  5. export default class JobQueue {
  6. static primaryInstance = new this();
  7. private concurrency: number;
  8. private isPaused: boolean;
  9. private jobs: Job[];
  10. private queue: Job[];
  11. private active: Job[];
  12. private processLock: boolean;
  13. private callbacks: Record<
  14. string,
  15. {
  16. resolve: (value: any) => void;
  17. reject: (reason?: any) => void;
  18. }
  19. >;
  20. /**
  21. * Job Queue
  22. */
  23. public constructor() {
  24. this.concurrency = 10000;
  25. this.isPaused = true;
  26. this.jobs = [];
  27. this.queue = [];
  28. this.active = [];
  29. this.callbacks = {};
  30. this.processLock = false;
  31. }
  32. /**
  33. * getJob - Fetch job
  34. *
  35. * @param jobId - Job UUID
  36. * @returns Job if found
  37. */
  38. public getJob(jobId: string) {
  39. return this.jobs.find(job => job.getUuid() === jobId);
  40. }
  41. /**
  42. * pause - Pause queue
  43. *
  44. * Pause processing of jobs in queue, active jobs will not be paused.
  45. */
  46. public pause() {
  47. this.isPaused = true;
  48. }
  49. /**
  50. * resume - Resume queue
  51. */
  52. public resume() {
  53. this.isPaused = false;
  54. this.process();
  55. }
  56. /**
  57. * runJob - Run a job
  58. *
  59. * @param moduleName - Module name
  60. * @param jobName - Job name
  61. * @param params - Params
  62. */
  63. public async runJob<
  64. ModuleNameType extends keyof Jobs & keyof Modules,
  65. JobNameType extends keyof Jobs[ModuleNameType] &
  66. keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
  67. PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
  68. ? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
  69. ? Record<string, never>
  70. : Jobs[ModuleNameType][JobNameType]["payload"]
  71. : Record<string, never>,
  72. ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
  73. ? Jobs[ModuleNameType][JobNameType]["returns"]
  74. : never
  75. >(
  76. moduleName: ModuleNameType,
  77. jobName: JobNameType,
  78. payload: PayloadType,
  79. options?: JobOptions
  80. ): Promise<ReturnType> {
  81. return new Promise<ReturnType>((resolve, reject) => {
  82. this.queueJob(
  83. moduleName,
  84. jobName,
  85. payload,
  86. { resolve, reject },
  87. options
  88. );
  89. });
  90. }
  91. /**
  92. * queueJob - Queue a job
  93. *
  94. * @param moduleName - Module name
  95. * @param jobName - Job name
  96. * @param params - Params
  97. */
  98. public async queueJob<
  99. ModuleNameType extends keyof Jobs & keyof Modules,
  100. JobNameType extends keyof Jobs[ModuleNameType] &
  101. keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
  102. PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
  103. ? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
  104. ? Record<string, never>
  105. : Jobs[ModuleNameType][JobNameType]["payload"]
  106. : Record<string, never>,
  107. ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
  108. ? Jobs[ModuleNameType][JobNameType]["returns"]
  109. : never
  110. >(
  111. moduleName: ModuleNameType,
  112. jobName: JobNameType,
  113. payload: PayloadType,
  114. callback: {
  115. resolve: (value: ReturnType) => void;
  116. reject: (reason?: any) => void;
  117. },
  118. options?: JobOptions
  119. ): Promise<string> {
  120. const job = new Job(jobName.toString(), moduleName, payload, options);
  121. this.callbacks[job.getUuid()] = callback;
  122. this.jobs.push(job);
  123. this.queue.push(job);
  124. this.process();
  125. return job.getUuid();
  126. }
  127. /**
  128. * process - Process queue
  129. */
  130. private async process() {
  131. // If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
  132. if (this.processLock) return;
  133. // If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
  134. if (
  135. this.isPaused ||
  136. this.active.length >= this.concurrency ||
  137. this.queue.length === 0
  138. )
  139. return;
  140. // Lock the process function
  141. this.processLock = true;
  142. // Sort jobs based on priority, with a lower priority being preferred
  143. const jobs = this.queue.sort(
  144. (a, b) => a.getPriority() - b.getPriority()
  145. );
  146. // Loop through all jobs
  147. for (let i = 0; i < jobs.length; i += 1) {
  148. const job = jobs[i];
  149. // If the module of the job is not started, we can't run the job, so go to the next job in the queue
  150. // eslint-disable-next-line no-continue
  151. if (job.getModule().getStatus() !== ModuleStatus.STARTED) continue;
  152. // Remove the job from the queue and add it to the active jobs array
  153. this.queue.splice(this.queue.indexOf(job), 1);
  154. // Execute the job
  155. this.active.push(job);
  156. const callback = this.callbacks[job.getUuid()];
  157. job.execute()
  158. .then(callback.resolve)
  159. .catch(callback.reject)
  160. .finally(() => {
  161. delete this.callbacks[job.getUuid()];
  162. // If the current job is in the active jobs array, remove it, and then run the process function to run another job
  163. const activeJobIndex = this.active.indexOf(job);
  164. if (activeJobIndex > -1) {
  165. this.active.splice(activeJobIndex, 1);
  166. }
  167. this.process();
  168. });
  169. // Stop the for loop
  170. if (this.active.length >= this.concurrency) break;
  171. }
  172. // Unlock the process after the for loop is finished, so it can be run again
  173. this.processLock = false;
  174. }
  175. /**
  176. * getStatus - Get status of job queue
  177. *
  178. * @returns Job queue status
  179. */
  180. public getStatus() {
  181. return {
  182. isPaused: this.isPaused,
  183. queueLength: this.queue.length,
  184. activeLength: this.active.length,
  185. concurrency: this.concurrency
  186. };
  187. }
  188. /**
  189. * getQueueStatus - Get statistics of queued or active jobs
  190. *
  191. * @param type - Job type filter
  192. * @returns Job queue statistics
  193. */
  194. public getQueueStatus(type?: JobStatus) {
  195. const status: Record<string, ReturnType<Job["toJSON"]>[]> = {};
  196. if (!type || type === JobStatus.ACTIVE)
  197. status.active = this.active.map(job => job.toJSON());
  198. if (!type || type === JobStatus.QUEUED)
  199. status.queue = this.queue.map(job => job.toJSON());
  200. return status;
  201. }
  202. /**
  203. * Gets the job array
  204. *
  205. */
  206. public getJobs() {
  207. return this.jobs;
  208. }
  209. static getPrimaryInstance(): JobQueue {
  210. return this.primaryInstance;
  211. }
  212. static setPrimaryInstance(instance: JobQueue) {
  213. this.primaryInstance = instance;
  214. }
  215. }