JobQueue.ts 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import Job, { JobStatus } from "@/Job";
  2. import { JobOptions } from "@/types/JobOptions";
  3. import JobStatistics, { JobStatisticsType } from "./JobStatistics";
  4. import { JobDerived } from "./types/JobDerived";
  5. import assertJobDerived from "./utils/assertJobDerived";
  6. export class JobQueue {
  7. private _concurrency: number;
  8. private _isPaused: boolean;
  9. private _queue: Job[];
  10. private _active: Job[];
  11. private _processLock: boolean;
  12. private _callbacks: Record<
  13. string,
  14. {
  15. resolve: (value: unknown) => void;
  16. reject: (reason?: unknown) => void;
  17. }
  18. >;
  19. /**
  20. * Job Queue
  21. */
  22. public constructor() {
  23. this._concurrency = 50;
  24. this._isPaused = true;
  25. this._queue = [];
  26. this._active = [];
  27. this._callbacks = {};
  28. this._processLock = false;
  29. }
  30. /**
  31. * pause - Pause queue
  32. *
  33. * Pause processing of jobs in queue, active jobs will not be paused.
  34. */
  35. public pause() {
  36. this._isPaused = true;
  37. }
  38. /**
  39. * resume - Resume queue
  40. */
  41. public resume() {
  42. this._isPaused = false;
  43. this._process();
  44. }
  45. /**
  46. * runJob - Run a job
  47. */
  48. public async runJob(
  49. JobClass: Function,
  50. payload?: unknown,
  51. options?: JobOptions
  52. ): Promise<unknown> {
  53. assertJobDerived(JobClass);
  54. return new Promise<unknown>((resolve, reject) => {
  55. this.queueJob(JobClass as JobDerived, payload, options)
  56. .then(uuid => {
  57. this._callbacks[uuid] = { resolve, reject };
  58. })
  59. .catch(reject);
  60. });
  61. }
  62. /**
  63. * queueJob - Queue a job
  64. */
  65. public async queueJob(
  66. JobClass: Function,
  67. payload?: unknown,
  68. options?: JobOptions
  69. ): Promise<string> {
  70. assertJobDerived(JobClass);
  71. const job = new (JobClass as JobDerived)(payload, options);
  72. JobStatistics.updateStats(job.getPath(), JobStatisticsType.QUEUED);
  73. this._queue.push(job);
  74. this._process();
  75. return job.getUuid();
  76. }
  77. /**
  78. * process - Process queue
  79. */
  80. private async _process() {
  81. // If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
  82. if (this._processLock) return;
  83. // If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
  84. if (
  85. this._isPaused ||
  86. this._active.length >= this._concurrency ||
  87. this._queue.length === 0
  88. )
  89. return;
  90. // Lock the process function
  91. this._processLock = true;
  92. // Sort jobs based on priority, with a lower priority being preferred
  93. const jobs = this._queue.sort(
  94. (a, b) => a.getPriority() - b.getPriority()
  95. );
  96. // Loop through all jobs
  97. for (let i = 0; i < jobs.length; i += 1) {
  98. const job = jobs[i];
  99. // If the module of the job is not started, we can't run the job, so go to the next job in the queue
  100. // eslint-disable-next-line no-continue
  101. if (!job.getModule().canRunJobs()) continue;
  102. // Remove the job from the queue and add it to the active jobs array
  103. this._queue.splice(this._queue.indexOf(job), 1);
  104. // Execute the job
  105. this._active.push(job);
  106. const callback = this._callbacks[job.getUuid()];
  107. job.execute()
  108. .then(callback?.resolve)
  109. .catch(callback?.reject)
  110. .catch(() => {}) // Ignore errors, any handling required is in job or callback
  111. .finally(() => {
  112. delete this._callbacks[job.getUuid()];
  113. // If the current job is in the active jobs array, remove it, and then run the process function to run another job
  114. const activeJobIndex = this._active.indexOf(job);
  115. if (activeJobIndex > -1) {
  116. this._active.splice(activeJobIndex, 1);
  117. }
  118. this._process();
  119. });
  120. // Stop the for loop
  121. if (this._active.length >= this._concurrency) break;
  122. }
  123. // Unlock the process after the for loop is finished, so it can be run again
  124. this._processLock = false;
  125. }
  126. /**
  127. * getStatus - Get status of job queue
  128. *
  129. * @returns Job queue status
  130. */
  131. public getStatus() {
  132. return {
  133. isPaused: this._isPaused,
  134. queueLength: this._queue.length,
  135. activeLength: this._active.length,
  136. concurrency: this._concurrency
  137. };
  138. }
  139. /**
  140. * getQueueStatus - Get statistics of queued or active jobs
  141. *
  142. * @param type - Job type filter
  143. * @returns Job queue statistics
  144. */
  145. public getQueueStatus(type?: JobStatus) {
  146. const status: Record<string, ReturnType<Job["toJSON"]>[]> = {};
  147. if (!type || type === JobStatus.ACTIVE)
  148. status.active = this._active.map(job => job.toJSON());
  149. if (!type || type === JobStatus.QUEUED)
  150. status.queue = this._queue.map(job => job.toJSON());
  151. return status;
  152. }
  153. }
  154. export default new JobQueue();