Job.ts 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import { SessionSchema } from "@models/sessions/schema";
  2. import { getErrorMessage } from "@common/utils/getErrorMessage";
  3. import { generateUuid } from "@common/utils/generateUuid";
  4. import JobContext from "@/JobContext";
  5. import JobStatistics, { JobStatisticsType } from "@/JobStatistics";
  6. import LogBook, { Log } from "@/LogBook";
  7. import BaseModule from "./BaseModule";
  8. import EventsModule from "./modules/EventsModule";
  9. export enum JobStatus {
  10. QUEUED = "QUEUED",
  11. ACTIVE = "ACTIVE",
  12. PAUSED = "PAUSED",
  13. COMPLETED = "COMPLETED"
  14. }
  15. export type JobOptions = {
  16. priority?: number;
  17. longJob?: string;
  18. session?: SessionSchema;
  19. socketId?: string;
  20. callbackRef?: string;
  21. };
  22. export default abstract class Job {
  23. protected static _apiEnabled = true;
  24. protected _module: InstanceType<typeof BaseModule>;
  25. protected _payload: any;
  26. protected _context: JobContext;
  27. protected _priority: number;
  28. protected _longJob?: {
  29. title: string;
  30. progress?: {
  31. data: unknown;
  32. time: Date;
  33. timeout?: NodeJS.Timeout;
  34. };
  35. };
  36. protected _uuid: string;
  37. protected _status: JobStatus;
  38. protected _createdAt: number;
  39. protected _startedAt?: number;
  40. protected _completedAt?: number;
  41. /**
  42. * Job
  43. *
  44. * @param name - Job name
  45. * @param module - Job module
  46. * @param callback - Job callback
  47. * @param options - Job options
  48. */
  49. public constructor(
  50. module: InstanceType<typeof BaseModule>,
  51. payload: unknown,
  52. options?: JobOptions
  53. ) {
  54. this._createdAt = performance.now();
  55. this._module = module;
  56. this._payload = payload;
  57. this._priority = 1;
  58. this._status = JobStatus.QUEUED;
  59. /* eslint-disable no-bitwise, eqeqeq */
  60. this._uuid = generateUuid();
  61. let contextOptions;
  62. if (options) {
  63. const { priority, longJob, session, socketId, callbackRef } =
  64. options;
  65. if (session || socketId)
  66. contextOptions = { session, socketId, callbackRef };
  67. if (priority) this._priority = priority;
  68. if (longJob)
  69. this._longJob = {
  70. title: longJob
  71. };
  72. }
  73. this._context = new JobContext(this, contextOptions);
  74. JobStatistics.updateStats(
  75. this.getPath(),
  76. JobStatisticsType.CONSTRUCTED
  77. );
  78. }
  79. /**
  80. * getName - Get job name
  81. */
  82. public static getName() {
  83. return this.name.substring(0, 1).toLowerCase() + this.name.substring(1);
  84. }
  85. /**
  86. * getName - Get job name
  87. */
  88. public getName() {
  89. return (
  90. this.constructor.name.substring(0, 1).toLowerCase() +
  91. this.constructor.name.substring(1)
  92. );
  93. }
  94. /**
  95. * getPath - Get module and job name in a dot format, e.g. module.jobName
  96. */
  97. public getPath() {
  98. return `${this._module.getName()}.${this.getName()}`;
  99. }
  100. /**
  101. * getPriority - Get job priority
  102. *
  103. * @returns priority
  104. */
  105. public getPriority() {
  106. return this._priority;
  107. }
  108. /**
  109. * getUuid - Get job UUID
  110. *
  111. * @returns UUID
  112. */
  113. public getUuid() {
  114. return this._uuid;
  115. }
  116. /**
  117. * getStatus - Get job status
  118. *
  119. * @returns status
  120. */
  121. public getStatus() {
  122. return this._status;
  123. }
  124. /**
  125. * setStatus - Set job status
  126. *
  127. * @param status - Job status
  128. */
  129. protected _setStatus(status: JobStatus) {
  130. this._status = status;
  131. }
  132. /**
  133. * getModule - Get module
  134. *
  135. * @returns module
  136. */
  137. public getModule() {
  138. return this._module;
  139. }
  140. public static isApiEnabled() {
  141. return this._apiEnabled;
  142. }
  143. public isApiEnabled() {
  144. return (this.constructor as typeof Job)._apiEnabled;
  145. }
  146. protected async _validate() {}
  147. protected async _authorize() {
  148. await this._context.assertPermission(this.getPath());
  149. }
  150. protected abstract _execute(): Promise<unknown>;
  151. /**
  152. * execute - Execute job
  153. *
  154. * @returns Promise
  155. */
  156. public async execute() {
  157. if (this._startedAt) throw new Error("Job has already been executed.");
  158. if (!this.getModule().canRunJobs())
  159. throw new Error("Module can not currently run jobs.");
  160. this._setStatus(JobStatus.ACTIVE);
  161. this._startedAt = performance.now();
  162. try {
  163. await this._validate();
  164. await this._authorize();
  165. const data = await this._execute();
  166. const socketId = this._context.getSocketId();
  167. const callbackRef = this._context.getCallbackRef();
  168. if (callbackRef) {
  169. await EventsModule.publish(`job.${this.getUuid()}`, {
  170. socketId,
  171. callbackRef,
  172. status: "success",
  173. data
  174. });
  175. }
  176. this.log({
  177. message: "Job completed successfully",
  178. type: "success"
  179. });
  180. JobStatistics.updateStats(
  181. this.getPath(),
  182. JobStatisticsType.SUCCESSFUL
  183. );
  184. return data;
  185. } catch (error: unknown) {
  186. const message = getErrorMessage(error);
  187. const socketId = this._context.getSocketId();
  188. const callbackRef = this._context.getCallbackRef();
  189. if (callbackRef) {
  190. await EventsModule.publish(`job.${this.getUuid()}`, {
  191. socketId,
  192. callbackRef,
  193. status: "error",
  194. message
  195. });
  196. }
  197. this.log({
  198. message: `Job failed with error "${message}"`,
  199. type: "error",
  200. data: { error }
  201. });
  202. JobStatistics.updateStats(this.getPath(), JobStatisticsType.FAILED);
  203. throw error;
  204. } finally {
  205. this._completedAt = performance.now();
  206. JobStatistics.updateStats(this.getPath(), JobStatisticsType.TOTAL);
  207. if (this._startedAt)
  208. JobStatistics.updateStats(
  209. this.getPath(),
  210. JobStatisticsType.DURATION,
  211. this._completedAt - this._startedAt
  212. );
  213. this._setStatus(JobStatus.COMPLETED);
  214. }
  215. }
  216. /**
  217. * Log a message in the context of the current job, which automatically sets the category and data
  218. *
  219. * @param log - Log message or object
  220. */
  221. public log(log: string | Omit<Log, "timestamp" | "category">) {
  222. const {
  223. message,
  224. type = undefined,
  225. data = {}
  226. } = {
  227. ...(typeof log === "string" ? { message: log } : log)
  228. };
  229. LogBook.log({
  230. message,
  231. type,
  232. category: this.getPath(),
  233. data: {
  234. ...this.toJSON(),
  235. ...data
  236. }
  237. });
  238. }
  239. /**
  240. * Serialize job info
  241. *
  242. * @returns json
  243. */
  244. public toJSON() {
  245. return {
  246. uuid: this.getUuid(),
  247. priority: this.getPriority(),
  248. name: this.getPath(),
  249. status: this.getStatus(),
  250. moduleStatus: this._module.getStatus(),
  251. createdAt: this._createdAt,
  252. startedAt: this._startedAt,
  253. completedAt: this._completedAt,
  254. payload: JSON.stringify(this._payload)
  255. };
  256. }
  257. }