123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- import BaseModule from "./BaseModule";
- import Job from "./Job";
- import JobContext from "./JobContext";
- import LogBook from "./LogBook";
- import ModuleManager from "./ModuleManager";
- import { JobOptions } from "./types/JobOptions";
- import { JobStatus } from "./types/JobStatus";
- import { Jobs, Modules } from "./types/Modules";
- export default class JobQueue {
- private concurrency: number;
- private isPaused: boolean;
- private jobs: Job[];
- private queue: Job[];
- private active: Job[];
- private stats: Record<
- string,
- {
- successful: number;
- failed: number;
- total: number;
- added: number;
- averageTime: number;
- }
- >;
- private processLock: boolean;
- private moduleManager: ModuleManager;
- private logBook: LogBook;
- /**
- * Job Queue
- */
- public constructor(moduleManager: ModuleManager | null = null) {
- this.concurrency = 1;
- this.isPaused = true;
- this.jobs = [];
- this.queue = [];
- this.active = [];
- this.stats = {};
- this.processLock = false;
- this.moduleManager =
- moduleManager ?? ModuleManager.getPrimaryInstance();
- this.logBook = LogBook.getPrimaryInstance();
- }
- /**
- * add - Add job to queue
- *
- * @param job - Job
- */
- public add(job: Job, runDirectly: boolean) {
- this.updateStats(job.getName(), "added");
- this.jobs.push(job);
- if (runDirectly) {
- this.executeJob(job);
- } else {
- this.queue.push(job);
- setTimeout(() => {
- this.process();
- }, 0);
- }
- }
- /**
- * getJob - Fetch job
- *
- * @param jobId - Job UUID
- * @returns Job if found
- */
- public getJob(jobId: string, recursive = false) {
- let job = this.jobs.find(job => job.getUuid() === jobId);
- if (job || !recursive) return job;
- this.jobs.some(currentJob => {
- job = currentJob.getJobQueue().getJob(jobId, recursive);
- return !!job;
- });
- return job;
- }
- /**
- * pause - Pause queue
- *
- * Pause processing of jobs in queue, active jobs will not be paused.
- */
- public pause() {
- this.isPaused = true;
- }
- /**
- * resume - Resume queue
- */
- public resume() {
- this.isPaused = false;
- this.process();
- }
- /**
- * runJob - Run a job
- *
- * @param moduleName - Module name
- * @param jobName - Job name
- * @param params - Params
- */
- public runJob<
- ModuleNameType extends keyof Jobs & keyof Modules,
- JobNameType extends keyof Jobs[ModuleNameType] &
- keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
- PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
- ? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
- ? Record<string, never>
- : Jobs[ModuleNameType][JobNameType]["payload"]
- : Record<string, never>,
- ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
- ? Jobs[ModuleNameType][JobNameType]["returns"]
- : never
- >(
- moduleName: ModuleNameType,
- jobName: JobNameType,
- payload: PayloadType,
- options?: JobOptions
- ): Promise<ReturnType> {
- return new Promise((resolve, reject) => {
- const module = this.moduleManager.getModule(
- moduleName
- ) as Modules[ModuleNameType];
- if (!module) reject(new Error("Module not found."));
- else {
- const jobFunction = module[jobName];
- if (!jobFunction || typeof jobFunction !== "function")
- reject(new Error("Job not found."));
- else if (
- Object.prototype.hasOwnProperty.call(BaseModule, jobName)
- )
- reject(new Error("Illegal job function."));
- else {
- const job = new Job(
- jobName.toString(),
- module,
- (job, resolveJob, rejectJob) => {
- const jobContext = new JobContext(job);
- jobFunction
- .apply(module, [jobContext, payload])
- .then((response: ReturnType) => {
- this.logBook.log({
- message: "Job completed successfully",
- type: "success",
- category: "jobs",
- data: {
- jobName: job.getName(),
- jobId: job.getUuid()
- }
- });
- resolveJob();
- resolve(response);
- })
- .catch((err: any) => {
- this.logBook.log({
- message: `Job failed with error "${err}"`,
- type: "error",
- category: "jobs",
- data: {
- jobName: job.getName(),
- jobId: job.getUuid()
- }
- });
- rejectJob();
- reject(err);
- });
- },
- {
- priority: (options && options.priority) || 10
- }
- );
- const runDirectly = !!(options && options.runDirectly);
- this.add(job, runDirectly);
- }
- }
- });
- }
- /**
- * Actually run a job function
- *
- * @param job - Initiated job
- */
- public executeJob(job: Job) {
- // Record when we started a job
- const startTime = Date.now();
- this.active.push(job);
- job.execute()
- .then(() => {
- this.updateStats(job.getName(), "successful");
- })
- .catch(() => {
- this.updateStats(job.getName(), "failed");
- })
- .finally(() => {
- this.updateStats(job.getName(), "total");
- this.updateStats(
- job.getName(),
- "averageTime",
- Date.now() - startTime
- );
- // If the current job is in the active jobs array, remove it, and then run the process function to run another job
- const activeJobIndex = this.active.indexOf(job);
- if (activeJobIndex > -1) {
- this.active.splice(activeJobIndex, 1);
- setTimeout(() => {
- this.process();
- }, 0);
- }
- });
- }
- /**
- * process - Process queue
- */
- private process() {
- // If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
- if (this.processLock) return;
- // If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
- if (
- this.isPaused ||
- this.active.length >= this.concurrency ||
- this.queue.length === 0
- )
- return;
- // Lock the process function
- this.processLock = true;
- // Sort jobs based on priority, with a lower priority being preferred
- const jobs = this.queue.sort(
- (a, b) => a.getPriority() - b.getPriority()
- );
- // Loop through all jobs
- for (let i = 0; i < jobs.length; i += 1) {
- const job = jobs[i];
- // If the module of the job is not started, we can't run the job, so go to the next job in the queue
- // eslint-disable-next-line no-continue
- if (job.getModule().getStatus() !== "STARTED") continue;
- // Remove the job from the queue and add it to the active jobs array
- this.queue.splice(this.queue.indexOf(job), 1);
- // Execute the job
- this.executeJob(job);
- // Stop the for loop
- break;
- }
- // Unlock the process after the for loop is finished, so it can be run again
- this.processLock = false;
- }
- /**
- * getStatus - Get status of job queue
- *
- * @returns Job queue status
- */
- public getStatus() {
- return {
- isPaused: this.isPaused,
- queueLength: this.queue.length,
- activeLength: this.active.length,
- concurrency: this.concurrency
- };
- }
- /**
- * getStats - Get statistics of job queue
- *
- * @returns Job queue statistics
- */
- public getStats() {
- return {
- ...this.stats,
- total: Object.values(this.stats).reduce(
- (a, b) => ({
- successful: a.successful + b.successful,
- failed: a.failed + b.failed,
- total: a.total + b.total,
- added: a.added + b.added,
- averageTime: -1
- }),
- {
- successful: 0,
- failed: 0,
- total: 0,
- added: 0,
- averageTime: -1
- }
- )
- };
- }
- /**
- * getQueueStatus - Get statistics of queued or active jobs
- *
- * @param type - Job type filter
- * @returns Job queue statistics
- */
- public getQueueStatus(type?: JobStatus) {
- const status: Record<
- string,
- {
- uuid: string;
- priority: number;
- name: string;
- status: JobStatus;
- }[]
- > = {};
- const format = (job: Job) => ({
- uuid: job.getUuid(),
- priority: job.getPriority(),
- name: job.getName(),
- status: job.getStatus()
- });
- if (!type || type === "ACTIVE") status.active = this.active.map(format);
- if (!type || type === "QUEUED") status.queue = this.queue.map(format);
- return status;
- }
- /**
- * Gets the job array
- *
- */
- public getJobs() {
- return this.jobs;
- }
- /**
- * updateStats - Update job statistics
- *
- * @param jobName - Job name
- * @param type - Stats type
- * @param duration - Duration of job, for average time stats
- */
- private updateStats(
- jobName: string,
- type: "successful" | "failed" | "total" | "added" | "averageTime",
- duration?: number
- ) {
- if (!this.stats[jobName])
- this.stats[jobName] = {
- successful: 0,
- failed: 0,
- total: 0,
- added: 0,
- averageTime: 0
- };
- if (type === "averageTime" && duration)
- this.stats[jobName].averageTime +=
- (duration - this.stats[jobName].averageTime) /
- this.stats[jobName].total;
- else this.stats[jobName][type] += 1;
- }
- }
|