|
@@ -1,14 +1,12 @@
|
|
|
import BaseModule from "./BaseModule";
|
|
|
import Job from "./Job";
|
|
|
-import JobContext from "./JobContext";
|
|
|
-import JobStatistics from "./JobStatistics";
|
|
|
-import LogBook from "./LogBook";
|
|
|
-import ModuleManager from "./ModuleManager";
|
|
|
import { JobOptions } from "./types/JobOptions";
|
|
|
import { JobStatus } from "./types/JobStatus";
|
|
|
import { Jobs, Modules } from "./types/Modules";
|
|
|
|
|
|
export default class JobQueue {
|
|
|
+ static primaryInstance = new this();
|
|
|
+
|
|
|
private concurrency: number;
|
|
|
|
|
|
private isPaused: boolean;
|
|
@@ -21,44 +19,25 @@ export default class JobQueue {
|
|
|
|
|
|
private processLock: boolean;
|
|
|
|
|
|
- private moduleManager: ModuleManager;
|
|
|
-
|
|
|
- private logBook: LogBook;
|
|
|
-
|
|
|
- private jobStatistics: JobStatistics;
|
|
|
+ private callbacks: Record<
|
|
|
+ string,
|
|
|
+ {
|
|
|
+ resolve: (value: unknown) => void;
|
|
|
+ reject: (reason?: any) => void;
|
|
|
+ }
|
|
|
+ >;
|
|
|
|
|
|
/**
|
|
|
* Job Queue
|
|
|
*/
|
|
|
- public constructor(moduleManager: ModuleManager | null = null) {
|
|
|
- this.concurrency = 1;
|
|
|
+ public constructor() {
|
|
|
+ this.concurrency = 10000;
|
|
|
this.isPaused = true;
|
|
|
this.jobs = [];
|
|
|
this.queue = [];
|
|
|
this.active = [];
|
|
|
+ this.callbacks = {};
|
|
|
this.processLock = false;
|
|
|
- this.moduleManager =
|
|
|
- moduleManager ?? ModuleManager.getPrimaryInstance();
|
|
|
- this.logBook = LogBook.getPrimaryInstance();
|
|
|
- this.jobStatistics = JobStatistics.getPrimaryInstance();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * add - Add job to queue
|
|
|
- *
|
|
|
- * @param job - Job
|
|
|
- */
|
|
|
- public add(job: Job, runDirectly: boolean) {
|
|
|
- this.jobStatistics.updateStats(job.getName(), "added");
|
|
|
- this.jobs.push(job);
|
|
|
- if (runDirectly) {
|
|
|
- this.executeJob(job);
|
|
|
- } else {
|
|
|
- this.queue.push(job);
|
|
|
- setTimeout(() => {
|
|
|
- this.process();
|
|
|
- }, 0);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -67,16 +46,8 @@ export default class JobQueue {
|
|
|
* @param jobId - Job UUID
|
|
|
* @returns Job if found
|
|
|
*/
|
|
|
- public getJob(jobId: string, recursive = false) {
|
|
|
- let job = this.jobs.find(job => job.getUuid() === jobId);
|
|
|
- if (job || !recursive) return job;
|
|
|
-
|
|
|
- this.jobs.some(currentJob => {
|
|
|
- job = currentJob.getJobQueue().getJob(jobId, recursive);
|
|
|
- return !!job;
|
|
|
- });
|
|
|
-
|
|
|
- return job;
|
|
|
+ public getJob(jobId: string) {
|
|
|
+ return this.jobs.find(job => job.getUuid() === jobId);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -103,7 +74,7 @@ export default class JobQueue {
|
|
|
* @param jobName - Job name
|
|
|
* @param params - Params
|
|
|
*/
|
|
|
- public runJob<
|
|
|
+ public async runJob<
|
|
|
ModuleNameType extends keyof Jobs & keyof Modules,
|
|
|
JobNameType extends keyof Jobs[ModuleNameType] &
|
|
|
keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
|
|
@@ -121,109 +92,27 @@ export default class JobQueue {
|
|
|
payload: PayloadType,
|
|
|
options?: JobOptions
|
|
|
): Promise<ReturnType> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- const module = this.moduleManager.getModule(
|
|
|
- moduleName
|
|
|
- ) as Modules[ModuleNameType];
|
|
|
- if (!module) reject(new Error("Module not found."));
|
|
|
- else {
|
|
|
- const jobFunction = module[jobName];
|
|
|
- if (!jobFunction || typeof jobFunction !== "function")
|
|
|
- reject(new Error("Job not found."));
|
|
|
- else if (
|
|
|
- Object.prototype.hasOwnProperty.call(BaseModule, jobName)
|
|
|
- )
|
|
|
- reject(new Error("Illegal job function."));
|
|
|
- else {
|
|
|
- const job = new Job(
|
|
|
- jobName.toString(),
|
|
|
- module,
|
|
|
- (job, resolveJob, rejectJob) => {
|
|
|
- const jobContext = new JobContext(job);
|
|
|
- jobFunction
|
|
|
- .apply(module, [jobContext, payload])
|
|
|
- .then((response: ReturnType) => {
|
|
|
- this.logBook.log({
|
|
|
- message: "Job completed successfully",
|
|
|
- type: "success",
|
|
|
- category: "jobs",
|
|
|
- data: {
|
|
|
- jobName: job.getName(),
|
|
|
- jobId: job.getUuid()
|
|
|
- }
|
|
|
- });
|
|
|
- resolveJob();
|
|
|
- resolve(response);
|
|
|
- })
|
|
|
- .catch((err: any) => {
|
|
|
- this.logBook.log({
|
|
|
- message: `Job failed with error "${err}"`,
|
|
|
- type: "error",
|
|
|
- category: "jobs",
|
|
|
- data: {
|
|
|
- jobName: job.getName(),
|
|
|
- jobId: job.getUuid()
|
|
|
- }
|
|
|
- });
|
|
|
- rejectJob();
|
|
|
- reject(err);
|
|
|
- });
|
|
|
- },
|
|
|
- {
|
|
|
- priority: (options && options.priority) || 10
|
|
|
- }
|
|
|
- );
|
|
|
-
|
|
|
- const runDirectly = !!(options && options.runDirectly);
|
|
|
-
|
|
|
- this.add(job, runDirectly);
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Actually run a job function
|
|
|
- *
|
|
|
- * @param job - Initiated job
|
|
|
- */
|
|
|
- public executeJob(job: Job) {
|
|
|
- // Record when we started a job
|
|
|
- const startTime = Date.now();
|
|
|
- this.active.push(job);
|
|
|
-
|
|
|
- job.execute()
|
|
|
- .then(() => {
|
|
|
- this.jobStatistics.updateStats(job.getName(), "successful");
|
|
|
- })
|
|
|
- .catch(() => {
|
|
|
- this.jobStatistics.updateStats(job.getName(), "failed");
|
|
|
- })
|
|
|
- .finally(() => {
|
|
|
- this.jobStatistics.updateStats(job.getName(), "total");
|
|
|
- this.jobStatistics.updateStats(
|
|
|
- job.getName(),
|
|
|
- "averageTime",
|
|
|
- Date.now() - startTime
|
|
|
- );
|
|
|
+ const job = new Job(jobName.toString(), moduleName, payload, options);
|
|
|
|
|
|
- job.setStatus("COMPLETED");
|
|
|
+ const runDirectly = !!(options && options.runDirectly);
|
|
|
|
|
|
- // If the current job is in the active jobs array, remove it, and then run the process function to run another job
|
|
|
- const activeJobIndex = this.active.indexOf(job);
|
|
|
- if (activeJobIndex > -1) {
|
|
|
- this.active.splice(activeJobIndex, 1);
|
|
|
- setTimeout(() => {
|
|
|
- this.process();
|
|
|
- }, 0);
|
|
|
- }
|
|
|
- });
|
|
|
+ this.jobs.push(job);
|
|
|
+ if (runDirectly) {
|
|
|
+ return job.execute();
|
|
|
+ }
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ this.callbacks[job.getUuid()] = { resolve, reject };
|
|
|
+ this.queue.push(job);
|
|
|
+ this.process();
|
|
|
+ }).finally(() => {
|
|
|
+ delete this.callbacks[job.getUuid()];
|
|
|
+ }) as Promise<ReturnType>;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* process - Process queue
|
|
|
*/
|
|
|
- private process() {
|
|
|
+ private async process() {
|
|
|
// If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
|
|
|
if (this.processLock) return;
|
|
|
// If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
|
|
@@ -254,10 +143,22 @@ export default class JobQueue {
|
|
|
this.queue.splice(this.queue.indexOf(job), 1);
|
|
|
|
|
|
// Execute the job
|
|
|
- this.executeJob(job);
|
|
|
-
|
|
|
+ this.active.push(job);
|
|
|
+
|
|
|
+ const callback = this.callbacks[job.getUuid()];
|
|
|
+ job.execute()
|
|
|
+ .then(callback.resolve)
|
|
|
+ .catch(callback.reject)
|
|
|
+ .finally(() => {
|
|
|
+ // If the current job is in the active jobs array, remove it, and then run the process function to run another job
|
|
|
+ const activeJobIndex = this.active.indexOf(job);
|
|
|
+ if (activeJobIndex > -1) {
|
|
|
+ this.active.splice(activeJobIndex, 1);
|
|
|
+ this.process();
|
|
|
+ }
|
|
|
+ });
|
|
|
// Stop the for loop
|
|
|
- break;
|
|
|
+ if (this.active.length >= this.concurrency) break;
|
|
|
}
|
|
|
|
|
|
// Unlock the process after the for loop is finished, so it can be run again
|
|
@@ -312,4 +213,12 @@ export default class JobQueue {
|
|
|
public getJobs() {
|
|
|
return this.jobs;
|
|
|
}
|
|
|
+
|
|
|
+ static getPrimaryInstance(): JobQueue {
|
|
|
+ return this.primaryInstance;
|
|
|
+ }
|
|
|
+
|
|
|
+ static setPrimaryInstance(instance: JobQueue) {
|
|
|
+ this.primaryInstance = instance;
|
|
|
+ }
|
|
|
}
|