123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- import Job, { JobStatus, JobOptions } from "@/Job";
- import JobStatistics, { JobStatisticsType } from "./JobStatistics";
- import { JobDerived } from "./types/JobDerived";
- import assertJobDerived from "./utils/assertJobDerived";
- export class JobQueue {
- private _concurrency: number;
- private _isPaused: boolean;
- private _queue: Job[];
- private _active: Job[];
- private _processLock: boolean;
- private _callbacks: Record<
- string,
- {
- resolve: (value: unknown) => void;
- reject: (reason?: unknown) => void;
- }
- >;
- /**
- * Job Queue
- */
- public constructor() {
- this._concurrency = 50;
- this._isPaused = true;
- this._queue = [];
- this._active = [];
- this._callbacks = {};
- this._processLock = false;
- }
- /**
- * pause - Pause queue
- *
- * Pause processing of jobs in queue, active jobs will not be paused.
- */
- public pause() {
- this._isPaused = true;
- }
- /**
- * resume - Resume queue
- */
- public resume() {
- this._isPaused = false;
- this._process();
- }
- /**
- * runJob - Run a job
- */
- public async runJob(
- // eslint-disable-next-line @typescript-eslint/ban-types
- JobClass: Function,
- payload?: unknown,
- options?: JobOptions
- ): Promise<unknown> {
- assertJobDerived(JobClass);
- return new Promise<unknown>((resolve, reject) => {
- assertJobDerived(JobClass);
- const job = new (JobClass as JobDerived)(payload, options);
- this._callbacks[job.getUuid()] = { resolve, reject };
- JobStatistics.updateStats(job.getPath(), JobStatisticsType.QUEUED);
- this._queue.push(job);
- this._process();
- });
- }
- /**
- * queueJob - Queue a job
- */
- public async queueJob(
- // eslint-disable-next-line @typescript-eslint/ban-types
- JobClass: Function,
- payload?: unknown,
- options?: JobOptions
- ): Promise<string> {
- assertJobDerived(JobClass);
- const job = new (JobClass as JobDerived)(payload, options);
- JobStatistics.updateStats(job.getPath(), JobStatisticsType.QUEUED);
- this._queue.push(job);
- this._process();
- return job.getUuid();
- }
- /**
- * process - Process queue
- */
- private async _process() {
- // If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
- if (this._processLock) return;
- // If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
- if (
- this._isPaused ||
- this._active.length >= this._concurrency ||
- this._queue.length === 0
- )
- return;
- // Lock the process function
- this._processLock = true;
- // Sort jobs based on priority, with a lower priority being preferred
- const jobs = this._queue.sort(
- (a, b) => a.getPriority() - b.getPriority()
- );
- // Loop through all jobs
- for (let i = 0; i < jobs.length; i += 1) {
- const job = jobs[i];
- // If the module of the job is not started, we can't run the job, so go to the next job in the queue
- // eslint-disable-next-line no-continue
- if (!job.getModule().canRunJobs()) continue;
- // Remove the job from the queue and add it to the active jobs array
- this._queue.splice(this._queue.indexOf(job), 1);
- // Execute the job
- this._active.push(job);
- const callback = this._callbacks[job.getUuid()];
- job.execute()
- .then(callback?.resolve)
- .catch(callback?.reject)
- .catch(() => {}) // Ignore errors, any handling required is in job or callback
- .finally(() => {
- delete this._callbacks[job.getUuid()];
- // If the current job is in the active jobs array, remove it, and then run the process function to run another job
- const activeJobIndex = this._active.indexOf(job);
- if (activeJobIndex > -1) {
- this._active.splice(activeJobIndex, 1);
- }
- this._process();
- });
- // Stop the for loop
- if (this._active.length >= this._concurrency) break;
- }
- // Unlock the process after the for loop is finished, so it can be run again
- this._processLock = false;
- }
- /**
- * getStatus - Get status of job queue
- *
- * @returns Job queue status
- */
- public getStatus() {
- return {
- isPaused: this._isPaused,
- queueLength: this._queue.length,
- activeLength: this._active.length,
- concurrency: this._concurrency
- };
- }
- /**
- * getQueueStatus - Get statistics of queued or active jobs
- *
- * @param type - Job type filter
- * @returns Job queue statistics
- */
- public getQueueStatus(type?: JobStatus) {
- const status: Record<string, ReturnType<Job["toJSON"]>[]> = {};
- if (!type || type === JobStatus.ACTIVE)
- status.active = this._active.map(job => job.toJSON());
- if (!type || type === JobStatus.QUEUED)
- status.queue = this._queue.map(job => job.toJSON());
- return status;
- }
- }
- export default new JobQueue();
|