JobQueue.ts 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. import Job, { JobStatus } from "@/Job";
  2. import { JobOptions } from "@/types/JobOptions";
  3. import ModuleManager from "./ModuleManager";
  4. export class JobQueue {
  5. private _concurrency: number;
  6. private _isPaused: boolean;
  7. private _queue: Job[];
  8. private _active: Job[];
  9. private _processLock: boolean;
  10. private _callbacks: Record<
  11. string,
  12. {
  13. resolve: (value: unknown) => void;
  14. reject: (reason?: unknown) => void;
  15. }
  16. >;
  17. /**
  18. * Job Queue
  19. */
  20. public constructor() {
  21. this._concurrency = 50;
  22. this._isPaused = true;
  23. this._queue = [];
  24. this._active = [];
  25. this._callbacks = {};
  26. this._processLock = false;
  27. }
  28. /**
  29. * pause - Pause queue
  30. *
  31. * Pause processing of jobs in queue, active jobs will not be paused.
  32. */
  33. public pause() {
  34. this._isPaused = true;
  35. }
  36. /**
  37. * resume - Resume queue
  38. */
  39. public resume() {
  40. this._isPaused = false;
  41. this._process();
  42. }
  43. /**
  44. * runJob - Run a job
  45. */
  46. public async runJob(
  47. JobClass: typeof Job,
  48. payload?: unknown,
  49. options?: JobOptions
  50. ): Promise<unknown> {
  51. return new Promise<unknown>((resolve, reject) => {
  52. this.queueJob(JobClass, payload, options)
  53. .then(uuid => {
  54. this._callbacks[uuid] = { resolve, reject };
  55. })
  56. .catch(reject);
  57. });
  58. }
  59. /**
  60. * queueJob - Queue a job
  61. */
  62. public async queueJob(
  63. JobClass: typeof Job,
  64. payload?: unknown,
  65. options?: JobOptions
  66. ): Promise<string> {
  67. const job = new JobClass(payload, options);
  68. this._queue.push(job);
  69. this._process();
  70. return job.getUuid();
  71. }
  72. /**
  73. * process - Process queue
  74. */
  75. private async _process() {
  76. // If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
  77. if (this._processLock) return;
  78. // If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
  79. if (
  80. this._isPaused ||
  81. this._active.length >= this._concurrency ||
  82. this._queue.length === 0
  83. )
  84. return;
  85. // Lock the process function
  86. this._processLock = true;
  87. // Sort jobs based on priority, with a lower priority being preferred
  88. const jobs = this._queue.sort(
  89. (a, b) => a.getPriority() - b.getPriority()
  90. );
  91. // Loop through all jobs
  92. for (let i = 0; i < jobs.length; i += 1) {
  93. const job = jobs[i];
  94. // If the module of the job is not started, we can't run the job, so go to the next job in the queue
  95. // eslint-disable-next-line no-continue
  96. if (!job.getModule().canRunJobs()) continue;
  97. // Remove the job from the queue and add it to the active jobs array
  98. this._queue.splice(this._queue.indexOf(job), 1);
  99. // Execute the job
  100. this._active.push(job);
  101. const callback = this._callbacks[job.getUuid()];
  102. job.execute()
  103. .then(callback?.resolve)
  104. .catch(callback?.reject)
  105. .finally(() => {
  106. // If the current job is in the active jobs array, remove it, and then run the process function to run another job
  107. const activeJobIndex = this._active.indexOf(job);
  108. if (activeJobIndex > -1) {
  109. this._active.splice(activeJobIndex, 1);
  110. }
  111. this._process();
  112. });
  113. // Stop the for loop
  114. if (this._active.length >= this._concurrency) break;
  115. }
  116. // Unlock the process after the for loop is finished, so it can be run again
  117. this._processLock = false;
  118. }
  119. /**
  120. * getStatus - Get status of job queue
  121. *
  122. * @returns Job queue status
  123. */
  124. public getStatus() {
  125. return {
  126. isPaused: this._isPaused,
  127. queueLength: this._queue.length,
  128. activeLength: this._active.length,
  129. concurrency: this._concurrency
  130. };
  131. }
  132. /**
  133. * getQueueStatus - Get statistics of queued or active jobs
  134. *
  135. * @param type - Job type filter
  136. * @returns Job queue statistics
  137. */
  138. public getQueueStatus(type?: JobStatus) {
  139. const status: Record<string, ReturnType<Job["toJSON"]>[]> = {};
  140. if (!type || type === JobStatus.ACTIVE)
  141. status.active = this._active.map(job => job.toJSON());
  142. if (!type || type === JobStatus.QUEUED)
  143. status.queue = this._queue.map(job => job.toJSON());
  144. return status;
  145. }
  146. }
  147. export default new JobQueue();