JobQueue.ts 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. import BaseModule from "./BaseModule";
  2. import Job from "./Job";
  3. import JobContext from "./JobContext";
  4. import LogBook from "./LogBook";
  5. import ModuleManager from "./ModuleManager";
  6. import { JobOptions } from "./types/JobOptions";
  7. import { JobStatus } from "./types/JobStatus";
  8. import { Jobs, Modules } from "./types/Modules";
  9. export default class JobQueue {
  10. private concurrency: number;
  11. private isPaused: boolean;
  12. private jobs: Job[];
  13. private queue: Job[];
  14. private active: Job[];
  15. private stats: Record<
  16. string,
  17. {
  18. successful: number;
  19. failed: number;
  20. total: number;
  21. added: number;
  22. averageTime: number;
  23. }
  24. >;
  25. private processLock: boolean;
  26. private moduleManager: ModuleManager;
  27. private logBook: LogBook;
  28. /**
  29. * Job Queue
  30. */
  31. public constructor(moduleManager: ModuleManager | null = null) {
  32. this.concurrency = 1;
  33. this.isPaused = true;
  34. this.jobs = [];
  35. this.queue = [];
  36. this.active = [];
  37. this.stats = {};
  38. this.processLock = false;
  39. this.moduleManager =
  40. moduleManager ?? ModuleManager.getPrimaryInstance();
  41. this.logBook = LogBook.getPrimaryInstance();
  42. }
  43. /**
  44. * add - Add job to queue
  45. *
  46. * @param job - Job
  47. */
  48. public add(job: Job, runDirectly: boolean) {
  49. this.updateStats(job.getName(), "added");
  50. this.jobs.push(job);
  51. if (runDirectly) {
  52. this.executeJob(job);
  53. } else {
  54. this.queue.push(job);
  55. setTimeout(() => {
  56. this.process();
  57. }, 0);
  58. }
  59. }
  60. /**
  61. * getJob - Fetch job
  62. *
  63. * @param jobId - Job UUID
  64. * @returns Job if found
  65. */
  66. public getJob(jobId: string, recursive = false) {
  67. let job = this.jobs.find(job => job.getUuid() === jobId);
  68. if (job || !recursive) return job;
  69. this.jobs.some(currentJob => {
  70. job = currentJob.getJobQueue().getJob(jobId, recursive);
  71. return !!job;
  72. });
  73. return job;
  74. }
  75. /**
  76. * pause - Pause queue
  77. *
  78. * Pause processing of jobs in queue, active jobs will not be paused.
  79. */
  80. public pause() {
  81. this.isPaused = true;
  82. }
  83. /**
  84. * resume - Resume queue
  85. */
  86. public resume() {
  87. this.isPaused = false;
  88. this.process();
  89. }
  90. /**
  91. * runJob - Run a job
  92. *
  93. * @param moduleName - Module name
  94. * @param jobName - Job name
  95. * @param params - Params
  96. */
  97. public runJob<
  98. ModuleNameType extends keyof Jobs & keyof Modules,
  99. JobNameType extends keyof Jobs[ModuleNameType] &
  100. keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
  101. PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
  102. ? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
  103. ? Record<string, never>
  104. : Jobs[ModuleNameType][JobNameType]["payload"]
  105. : Record<string, never>,
  106. ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
  107. ? Jobs[ModuleNameType][JobNameType]["returns"]
  108. : never
  109. >(
  110. moduleName: ModuleNameType,
  111. jobName: JobNameType,
  112. payload: PayloadType,
  113. options?: JobOptions
  114. ): Promise<ReturnType> {
  115. return new Promise((resolve, reject) => {
  116. const module = this.moduleManager.getModule(
  117. moduleName
  118. ) as Modules[ModuleNameType];
  119. if (!module) reject(new Error("Module not found."));
  120. else {
  121. const jobFunction = module[jobName];
  122. if (!jobFunction || typeof jobFunction !== "function")
  123. reject(new Error("Job not found."));
  124. else if (
  125. Object.prototype.hasOwnProperty.call(BaseModule, jobName)
  126. )
  127. reject(new Error("Illegal job function."));
  128. else {
  129. const job = new Job(
  130. jobName.toString(),
  131. module,
  132. (job, resolveJob, rejectJob) => {
  133. const jobContext = new JobContext(job);
  134. jobFunction
  135. .apply(module, [jobContext, payload])
  136. .then((response: ReturnType) => {
  137. this.logBook.log({
  138. message: "Job completed successfully",
  139. type: "success",
  140. category: "jobs",
  141. data: {
  142. jobName: job.getName(),
  143. jobId: job.getUuid()
  144. }
  145. });
  146. resolveJob();
  147. resolve(response);
  148. })
  149. .catch((err: any) => {
  150. this.logBook.log({
  151. message: `Job failed with error "${err}"`,
  152. type: "error",
  153. category: "jobs",
  154. data: {
  155. jobName: job.getName(),
  156. jobId: job.getUuid()
  157. }
  158. });
  159. rejectJob();
  160. reject(err);
  161. });
  162. },
  163. {
  164. priority: (options && options.priority) || 10
  165. }
  166. );
  167. const runDirectly = !!(options && options.runDirectly);
  168. this.add(job, runDirectly);
  169. }
  170. }
  171. });
  172. }
  173. /**
  174. * Actually run a job function
  175. *
  176. * @param job - Initiated job
  177. */
  178. public executeJob(job: Job) {
  179. // Record when we started a job
  180. const startTime = Date.now();
  181. this.active.push(job);
  182. job.execute()
  183. .then(() => {
  184. this.updateStats(job.getName(), "successful");
  185. })
  186. .catch(() => {
  187. this.updateStats(job.getName(), "failed");
  188. })
  189. .finally(() => {
  190. this.updateStats(job.getName(), "total");
  191. this.updateStats(
  192. job.getName(),
  193. "averageTime",
  194. Date.now() - startTime
  195. );
  196. // If the current job is in the active jobs array, remove it, and then run the process function to run another job
  197. const activeJobIndex = this.active.indexOf(job);
  198. if (activeJobIndex > -1) {
  199. this.active.splice(activeJobIndex, 1);
  200. setTimeout(() => {
  201. this.process();
  202. }, 0);
  203. }
  204. });
  205. }
  206. /**
  207. * process - Process queue
  208. */
  209. private process() {
  210. // If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
  211. if (this.processLock) return;
  212. // If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
  213. if (
  214. this.isPaused ||
  215. this.active.length >= this.concurrency ||
  216. this.queue.length === 0
  217. )
  218. return;
  219. // Lock the process function
  220. this.processLock = true;
  221. // Sort jobs based on priority, with a lower priority being preferred
  222. const jobs = this.queue.sort(
  223. (a, b) => a.getPriority() - b.getPriority()
  224. );
  225. // Loop through all jobs
  226. for (let i = 0; i < jobs.length; i += 1) {
  227. const job = jobs[i];
  228. // If the module of the job is not started, we can't run the job, so go to the next job in the queue
  229. // eslint-disable-next-line no-continue
  230. if (job.getModule().getStatus() !== "STARTED") continue;
  231. // Remove the job from the queue and add it to the active jobs array
  232. this.queue.splice(this.queue.indexOf(job), 1);
  233. // Execute the job
  234. this.executeJob(job);
  235. // Stop the for loop
  236. break;
  237. }
  238. // Unlock the process after the for loop is finished, so it can be run again
  239. this.processLock = false;
  240. }
  241. /**
  242. * getStatus - Get status of job queue
  243. *
  244. * @returns Job queue status
  245. */
  246. public getStatus() {
  247. return {
  248. isPaused: this.isPaused,
  249. queueLength: this.queue.length,
  250. activeLength: this.active.length,
  251. concurrency: this.concurrency
  252. };
  253. }
  254. /**
  255. * getStats - Get statistics of job queue
  256. *
  257. * @returns Job queue statistics
  258. */
  259. public getStats() {
  260. return {
  261. ...this.stats,
  262. total: Object.values(this.stats).reduce(
  263. (a, b) => ({
  264. successful: a.successful + b.successful,
  265. failed: a.failed + b.failed,
  266. total: a.total + b.total,
  267. added: a.added + b.added,
  268. averageTime: -1
  269. }),
  270. {
  271. successful: 0,
  272. failed: 0,
  273. total: 0,
  274. added: 0,
  275. averageTime: -1
  276. }
  277. )
  278. };
  279. }
  280. /**
  281. * getQueueStatus - Get statistics of queued or active jobs
  282. *
  283. * @param type - Job type filter
  284. * @returns Job queue statistics
  285. */
  286. public getQueueStatus(type?: JobStatus) {
  287. const status: Record<
  288. string,
  289. {
  290. uuid: string;
  291. priority: number;
  292. name: string;
  293. status: JobStatus;
  294. }[]
  295. > = {};
  296. const format = (job: Job) => ({
  297. uuid: job.getUuid(),
  298. priority: job.getPriority(),
  299. name: job.getName(),
  300. status: job.getStatus()
  301. });
  302. if (!type || type === "ACTIVE") status.active = this.active.map(format);
  303. if (!type || type === "QUEUED") status.queue = this.queue.map(format);
  304. return status;
  305. }
  306. /**
  307. * Gets the job array
  308. *
  309. */
  310. public getJobs() {
  311. return this.jobs;
  312. }
  313. /**
  314. * updateStats - Update job statistics
  315. *
  316. * @param jobName - Job name
  317. * @param type - Stats type
  318. * @param duration - Duration of job, for average time stats
  319. */
  320. private updateStats(
  321. jobName: string,
  322. type: "successful" | "failed" | "total" | "added" | "averageTime",
  323. duration?: number
  324. ) {
  325. if (!this.stats[jobName])
  326. this.stats[jobName] = {
  327. successful: 0,
  328. failed: 0,
  329. total: 0,
  330. added: 0,
  331. averageTime: 0
  332. };
  333. if (type === "averageTime" && duration)
  334. this.stats[jobName].averageTime +=
  335. (duration - this.stats[jobName].averageTime) /
  336. this.stats[jobName].total;
  337. else this.stats[jobName][type] += 1;
  338. }
  339. }