JobQueue.ts 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import Job, { JobStatus, JobOptions } from "@/Job";
  2. import JobStatistics, { JobStatisticsType } from "./JobStatistics";
  3. import { JobDerived } from "./types/JobDerived";
  4. import assertJobDerived from "./utils/assertJobDerived";
  5. export class JobQueue {
  6. private _concurrency: number;
  7. private _isPaused: boolean;
  8. private _queue: Job[];
  9. private _active: Job[];
  10. private _processLock: boolean;
  11. private _callbacks: Record<
  12. string,
  13. {
  14. resolve: (value: unknown) => void;
  15. reject: (reason?: unknown) => void;
  16. }
  17. >;
  18. /**
  19. * Job Queue
  20. */
  21. public constructor() {
  22. this._concurrency = 50;
  23. this._isPaused = true;
  24. this._queue = [];
  25. this._active = [];
  26. this._callbacks = {};
  27. this._processLock = false;
  28. }
  29. /**
  30. * pause - Pause queue
  31. *
  32. * Pause processing of jobs in queue, active jobs will not be paused.
  33. */
  34. public pause() {
  35. this._isPaused = true;
  36. }
  37. /**
  38. * resume - Resume queue
  39. */
  40. public resume() {
  41. this._isPaused = false;
  42. this._process();
  43. }
  44. /**
  45. * runJob - Run a job
  46. */
  47. public async runJob(
  48. // eslint-disable-next-line @typescript-eslint/ban-types
  49. JobClass: Function,
  50. payload?: unknown,
  51. options?: JobOptions
  52. ): Promise<unknown> {
  53. assertJobDerived(JobClass);
  54. return new Promise<unknown>((resolve, reject) => {
  55. assertJobDerived(JobClass);
  56. const job = new (JobClass as JobDerived)(payload, options);
  57. this._callbacks[job.getUuid()] = { resolve, reject };
  58. JobStatistics.updateStats(job.getPath(), JobStatisticsType.QUEUED);
  59. this._queue.push(job);
  60. this._process();
  61. });
  62. }
  63. /**
  64. * queueJob - Queue a job
  65. */
  66. public async queueJob(
  67. // eslint-disable-next-line @typescript-eslint/ban-types
  68. JobClass: Function,
  69. payload?: unknown,
  70. options?: JobOptions
  71. ): Promise<string> {
  72. assertJobDerived(JobClass);
  73. const job = new (JobClass as JobDerived)(payload, options);
  74. JobStatistics.updateStats(job.getPath(), JobStatisticsType.QUEUED);
  75. this._queue.push(job);
  76. this._process();
  77. return job.getUuid();
  78. }
  79. /**
  80. * process - Process queue
  81. */
  82. private async _process() {
  83. // If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
  84. if (this._processLock) return;
  85. // If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
  86. if (
  87. this._isPaused ||
  88. this._active.length >= this._concurrency ||
  89. this._queue.length === 0
  90. )
  91. return;
  92. // Lock the process function
  93. this._processLock = true;
  94. // Sort jobs based on priority, with a lower priority being preferred
  95. const jobs = this._queue.sort(
  96. (a, b) => a.getPriority() - b.getPriority()
  97. );
  98. // Loop through all jobs
  99. for (let i = 0; i < jobs.length; i += 1) {
  100. const job = jobs[i];
  101. // If the module of the job is not started, we can't run the job, so go to the next job in the queue
  102. // eslint-disable-next-line no-continue
  103. if (!job.getModule().canRunJobs()) continue;
  104. // Remove the job from the queue and add it to the active jobs array
  105. this._queue.splice(this._queue.indexOf(job), 1);
  106. // Execute the job
  107. this._active.push(job);
  108. const callback = this._callbacks[job.getUuid()];
  109. job.execute()
  110. .then(callback?.resolve)
  111. .catch(callback?.reject)
  112. .catch(() => {}) // Ignore errors, any handling required is in job or callback
  113. .finally(() => {
  114. delete this._callbacks[job.getUuid()];
  115. // If the current job is in the active jobs array, remove it, and then run the process function to run another job
  116. const activeJobIndex = this._active.indexOf(job);
  117. if (activeJobIndex > -1) {
  118. this._active.splice(activeJobIndex, 1);
  119. }
  120. this._process();
  121. });
  122. // Stop the for loop
  123. if (this._active.length >= this._concurrency) break;
  124. }
  125. // Unlock the process after the for loop is finished, so it can be run again
  126. this._processLock = false;
  127. }
  128. /**
  129. * getStatus - Get status of job queue
  130. *
  131. * @returns Job queue status
  132. */
  133. public getStatus() {
  134. return {
  135. isPaused: this._isPaused,
  136. queueLength: this._queue.length,
  137. activeLength: this._active.length,
  138. concurrency: this._concurrency
  139. };
  140. }
  141. /**
  142. * getQueueStatus - Get statistics of queued or active jobs
  143. *
  144. * @param type - Job type filter
  145. * @returns Job queue statistics
  146. */
  147. public getQueueStatus(type?: JobStatus) {
  148. const status: Record<string, ReturnType<Job["toJSON"]>[]> = {};
  149. if (!type || type === JobStatus.ACTIVE)
  150. status.active = this._active.map(job => job.toJSON());
  151. if (!type || type === JobStatus.QUEUED)
  152. status.queue = this._queue.map(job => job.toJSON());
  153. return status;
  154. }
  155. }
  156. export default new JobQueue();