123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- import { SessionSchema } from "@models/sessions/schema";
- import { getErrorMessage } from "@common/utils/getErrorMessage";
- import { generateUuid } from "@common/utils/generateUuid";
- import JobContext from "@/JobContext";
- import JobStatistics, { JobStatisticsType } from "@/JobStatistics";
- import LogBook, { Log } from "@/LogBook";
- import BaseModule from "./BaseModule";
- import EventsModule from "./modules/EventsModule";
- export enum JobStatus {
- QUEUED = "QUEUED",
- ACTIVE = "ACTIVE",
- PAUSED = "PAUSED",
- COMPLETED = "COMPLETED"
- }
- export type JobOptions = {
- priority?: number;
- longJob?: string;
- session?: SessionSchema;
- socketId?: string;
- callbackRef?: string;
- };
- export default abstract class Job {
- protected static _apiEnabled = true;
- protected _module: InstanceType<typeof BaseModule>;
- protected _payload: any;
- protected _context: JobContext;
- protected _priority: number;
- protected _longJob?: {
- title: string;
- progress?: {
- data: unknown;
- time: Date;
- timeout?: NodeJS.Timeout;
- };
- };
- protected _uuid: string;
- protected _status: JobStatus;
- protected _createdAt: number;
- protected _startedAt?: number;
- protected _completedAt?: number;
- /**
- * Job
- *
- * @param name - Job name
- * @param module - Job module
- * @param callback - Job callback
- * @param options - Job options
- */
- public constructor(
- module: InstanceType<typeof BaseModule>,
- payload: unknown,
- options?: JobOptions
- ) {
- this._createdAt = performance.now();
- this._module = module;
- this._payload = payload;
- this._priority = 1;
- this._status = JobStatus.QUEUED;
- /* eslint-disable no-bitwise, eqeqeq */
- this._uuid = generateUuid();
- let contextOptions;
- if (options) {
- const { priority, longJob, session, socketId, callbackRef } =
- options;
- if (session || socketId)
- contextOptions = { session, socketId, callbackRef };
- if (priority) this._priority = priority;
- if (longJob)
- this._longJob = {
- title: longJob
- };
- }
- this._context = new JobContext(this, contextOptions);
- JobStatistics.updateStats(
- this.getPath(),
- JobStatisticsType.CONSTRUCTED
- );
- }
- /**
- * getName - Get job name
- */
- public static getName() {
- return this.name.substring(0, 1).toLowerCase() + this.name.substring(1);
- }
- /**
- * getName - Get job name
- */
- public getName() {
- return (
- this.constructor.name.substring(0, 1).toLowerCase() +
- this.constructor.name.substring(1)
- );
- }
- /**
- * getPath - Get module and job name in a dot format, e.g. module.jobName
- */
- public getPath() {
- return `${this._module.getName()}.${this.getName()}`;
- }
- /**
- * getPriority - Get job priority
- *
- * @returns priority
- */
- public getPriority() {
- return this._priority;
- }
- /**
- * getUuid - Get job UUID
- *
- * @returns UUID
- */
- public getUuid() {
- return this._uuid;
- }
- /**
- * getStatus - Get job status
- *
- * @returns status
- */
- public getStatus() {
- return this._status;
- }
- /**
- * setStatus - Set job status
- *
- * @param status - Job status
- */
- protected _setStatus(status: JobStatus) {
- this._status = status;
- }
- /**
- * getModule - Get module
- *
- * @returns module
- */
- public getModule() {
- return this._module;
- }
- public static isApiEnabled() {
- return this._apiEnabled;
- }
- public isApiEnabled() {
- return (this.constructor as typeof Job)._apiEnabled;
- }
- protected async _validate() {}
- protected async _authorize() {
- await this._context.assertPermission(this.getPath());
- }
- protected abstract _execute(): Promise<unknown>;
- /**
- * execute - Execute job
- *
- * @returns Promise
- */
- public async execute() {
- if (this._startedAt) throw new Error("Job has already been executed.");
- if (!this.getModule().canRunJobs())
- throw new Error("Module can not currently run jobs.");
- this._setStatus(JobStatus.ACTIVE);
- this._startedAt = performance.now();
- try {
- await this._validate();
- await this._authorize();
- const data = await this._execute();
- const socketId = this._context.getSocketId();
- const callbackRef = this._context.getCallbackRef();
- if (callbackRef) {
- await EventsModule.publish(`job.${this.getUuid()}`, {
- socketId,
- callbackRef,
- status: "success",
- data
- });
- }
- this.log({
- message: "Job completed successfully",
- type: "success"
- });
- JobStatistics.updateStats(
- this.getPath(),
- JobStatisticsType.SUCCESSFUL
- );
- return data;
- } catch (error: unknown) {
- const message = getErrorMessage(error);
- const socketId = this._context.getSocketId();
- const callbackRef = this._context.getCallbackRef();
- if (callbackRef) {
- await EventsModule.publish(`job.${this.getUuid()}`, {
- socketId,
- callbackRef,
- status: "error",
- message
- });
- }
- this.log({
- message: `Job failed with error "${message}"`,
- type: "error",
- data: { error }
- });
- JobStatistics.updateStats(this.getPath(), JobStatisticsType.FAILED);
- throw error;
- } finally {
- this._completedAt = performance.now();
- JobStatistics.updateStats(this.getPath(), JobStatisticsType.TOTAL);
- if (this._startedAt)
- JobStatistics.updateStats(
- this.getPath(),
- JobStatisticsType.DURATION,
- this._completedAt - this._startedAt
- );
- this._setStatus(JobStatus.COMPLETED);
- }
- }
- /**
- * Log a message in the context of the current job, which automatically sets the category and data
- *
- * @param log - Log message or object
- */
- public log(log: string | Omit<Log, "timestamp" | "category">) {
- const {
- message,
- type = undefined,
- data = {}
- } = {
- ...(typeof log === "string" ? { message: log } : log)
- };
- LogBook.log({
- message,
- type,
- category: this.getPath(),
- data: {
- ...this.toJSON(),
- ...data
- }
- });
- }
- /**
- * Serialize job info
- *
- * @returns json
- */
- public toJSON() {
- return {
- uuid: this.getUuid(),
- priority: this.getPriority(),
- name: this.getPath(),
- status: this.getStatus(),
- moduleStatus: this._module.getStatus(),
- createdAt: this._createdAt,
- startedAt: this._startedAt,
- completedAt: this._completedAt,
- payload: JSON.stringify(this._payload)
- };
- }
- }
|