瀏覽代碼

refactor: Use promises for module startup and shutdown

Owen Diffey 2 年之前
父節點
當前提交
80607e19bf
共有 5 個文件被更改,包括 113 次插入51 次删除
  1. 14 7
      backend/src/BaseModule.ts
  2. 65 31
      backend/src/ModuleManager.ts
  3. 12 1
      backend/src/main.ts
  4. 11 6
      backend/src/modules/OtherModule.ts
  5. 11 6
      backend/src/modules/StationModule.ts

+ 14 - 7
backend/src/BaseModule.ts

@@ -46,16 +46,19 @@ export default abstract class BaseModule {
 	 *
 	 * @param {ModuleStatus} status Module status
 	 */
-	protected setStatus(status: ModuleStatus): void {
+	public setStatus(status: ModuleStatus): void {
 		this.status = status;
 	}
 
 	/**
 	 * startup - Startup module
 	 */
-	public startup(): void {
-		console.log(`Module (${this.name}) starting`);
-		this.setStatus("STARTING");
+	public startup(): Promise<void> {
+		return new Promise(resolve => {
+			console.log(`Module (${this.name}) starting`);
+			this.setStatus("STARTING");
+			resolve();
+		});
 	}
 
 	/**
@@ -69,9 +72,13 @@ export default abstract class BaseModule {
 	/**
 	 * shutdown - Shutdown module
 	 */
-	public shutdown(): void {
-		console.log(`Module (${this.name}) stopping`);
-		this.setStatus("STOPPING");
+	public shutdown(): Promise<void> {
+		return new Promise(resolve => {
+			console.log(`Module (${this.name}) stopping`);
+			this.setStatus("STOPPING");
+			this.stopped();
+			resolve();
+		});
 	}
 
 	/**

+ 65 - 31
backend/src/ModuleManager.ts

@@ -1,3 +1,4 @@
+import async from "async";
 import BaseModule from "./BaseModule";
 import Job from "./Job";
 import JobQueue from "./JobQueue";
@@ -10,7 +11,7 @@ type ModuleClass<Module extends typeof BaseModule> = {
 };
 
 export default class ModuleManager {
-	private modules: Modules | null;
+	private modules?: Modules;
 
 	private jobQueue: JobQueue;
 
@@ -19,7 +20,6 @@ export default class ModuleManager {
 	 *
 	 */
 	public constructor() {
-		this.modules = null;
 		this.jobQueue = new JobQueue();
 	}
 
@@ -29,14 +29,10 @@ export default class ModuleManager {
 	 * @returns {object} Module statuses
 	 */
 	public getStatus() {
-		if (!this.modules) return {};
-
 		const status: Record<string, ModuleStatus> = {};
-
-		Object.entries(this.modules).forEach(([name, module]) => {
+		Object.entries(this.modules || {}).forEach(([name, module]) => {
 			status[name] = module.getStatus();
 		});
-
 		return status;
 	}
 
@@ -68,31 +64,29 @@ export default class ModuleManager {
 	}
 
 	/**
+	 * loadModule - Load and initialize module
 	 *
 	 * @param {string} moduleName Name of the module
 	 * @returns {typeof BaseModule} Module
 	 */
-	private getModule<T extends keyof Modules>(
+	private loadModule<T extends keyof Modules>(
 		moduleName: T
 	): Promise<Modules[T]> {
 		return new Promise(resolve => {
 			const mapper = {
 				stations: "StationModule",
-				others: "OtherModule"
+				others: "OtherModule",
+				data: "DataModule"
 			};
-			import(`./modules/${mapper[moduleName]}`).then(response => {
-				const Module: ModuleClass<Modules[T]> = response.default;
-				const module = new Module(this);
-				resolve(module);
-			});
+			import(`./modules/${mapper[moduleName]}`).then(
+				({ default: Module }: { default: ModuleClass<Modules[T]> }) => {
+					const module = new Module(this);
+					resolve(module);
+				}
+			);
 		});
 	}
 
-	/**
-	 * loadModules - Load and initialize all modules
-	 *
-	 * @returns {Promise} Promise
-	 */
 	/**
 	 * loadModules - Load and initialize all modules
 	 *
@@ -101,8 +95,9 @@ export default class ModuleManager {
 	private loadModules(): Promise<void> {
 		return new Promise((resolve, reject) => {
 			const fetchModules = async () => ({
-				stations: await this.getModule("stations"),
-				others: await this.getModule("others")
+				data: await this.loadModule("data"),
+				others: await this.loadModule("others"),
+				stations: await this.loadModule("stations")
 			});
 			fetchModules()
 				.then(modules => {
@@ -121,20 +116,60 @@ export default class ModuleManager {
 	public startup(): void {
 		this.loadModules()
 			.then(() => {
-				if (!this.modules) return;
-				Object.values(this.modules).forEach(module => module.startup());
-				this.jobQueue.resume();
+				if (!this.modules) throw new Error("No modules were loaded");
+				async.each(
+					Object.values(this.modules),
+					(module, next) => {
+						module
+							.startup()
+							.then(() => next())
+							.catch(err => {
+								module.setStatus("ERROR");
+								next(err);
+							});
+					},
+					async err => {
+						if (err) {
+							await this.shutdown();
+							throw err;
+						}
+						this.jobQueue.resume();
+					}
+				);
 			})
-			.catch(() => this.shutdown());
+			.catch(async err => {
+				await this.shutdown();
+				throw err;
+			});
 	}
 
 	/**
 	 * shutdown - Handle shutdown
 	 */
-	public shutdown(): void {
-		if (!this.modules) return;
-		console.log(this.modules);
-		Object.values(this.modules).forEach(module => module.shutdown());
+	public shutdown(): Promise<void> {
+		return new Promise((resolve, reject) => {
+			// TODO: await jobQueue completion/handle shutdown
+			if (this.modules)
+				async.each(
+					Object.values(this.modules),
+					(module, next) => {
+						if (
+							module.getStatus() === "STARTED" ||
+							module.getStatus() === "STARTING" || // TODO: Handle better
+							module.getStatus() === "ERROR"
+						)
+							module
+								.shutdown()
+								.then(() => next())
+								.catch(next);
+					},
+					err => {
+						if (err) reject(err);
+						else resolve();
+					}
+				);
+			else resolve();
+		});
 	}
 
 	/**
@@ -163,8 +198,7 @@ export default class ModuleManager {
 	): Promise<ReturnType> {
 		const [payload, options] = params;
 		return new Promise<ReturnType>((resolve, reject) => {
-			if (!this.modules) return;
-			const module = this.modules[moduleName];
+			const module = this.modules && this.modules[moduleName];
 			if (!module) reject(new Error("Module not found."));
 			else {
 				const jobFunction = module[jobName];

+ 12 - 1
backend/src/main.ts

@@ -39,6 +39,17 @@ process.on("uncaughtException", err => {
 	console.log(`UNCAUGHT EXCEPTION: ${err.stack}`);
 });
 
+const shutdown = () => {
+	moduleManager
+		.shutdown()
+		.then(() => process.exit(0))
+		.catch(() => process.exit(1));
+};
+process.on("SIGINT", shutdown);
+process.on("SIGQUIT", shutdown);
+process.on("SIGTERM", shutdown);
+process.on("SIGUSR2", shutdown);
+
 const runCommand = (line: string) => {
 	const [command, ...args] = line.split(" ");
 	switch (command) {
@@ -87,7 +98,7 @@ const runCommand = (line: string) => {
 			break;
 		}
 		default: {
-			if (!/^\s*$/.test(command))
+			if (!/^\s*$/.test(command) && command !== "rs")
 				console.log(`Command "${command}" not found`);
 		}
 	}

+ 11 - 6
backend/src/modules/OtherModule.ts

@@ -15,12 +15,17 @@ export default class OtherModule extends BaseModule {
 	/**
 	 * startup - Startup other module
 	 */
-	public override startup(): void {
-		super.startup();
-
-		console.log("Other Startup");
-
-		super.started();
+	public override startup(): Promise<void> {
+		return new Promise((resolve, reject) => {
+			super
+				.startup()
+				.then(() => {
+					console.log("Other Startup");
+					super.started();
+					resolve();
+				})
+				.catch(err => reject(err));
+		});
 	}
 
 	/**

+ 11 - 6
backend/src/modules/StationModule.ts

@@ -15,12 +15,17 @@ export default class StationModule extends BaseModule {
 	/**
 	 * startup - Startup station module
 	 */
-	public override startup(): void {
-		super.startup();
-
-		console.log("Station Startup");
-
-		super.started();
+	public override startup(): Promise<void> {
+		return new Promise((resolve, reject) => {
+			super
+				.startup()
+				.then(() => {
+					console.log("Station Startup");
+					super.started();
+					resolve();
+				})
+				.catch(err => reject(err));
+		});
 	}
 
 	/**