浏览代码

feat(job system): jobs can now have options e.g. isQuiet

Signed-off-by: Jonathan <theflametrooper@gmail.com>
Jonathan 4 年之前
父节点
当前提交
386569a8d0
共有 3 个文件被更改,包括 186 次插入132 次删除
  1. 64 26
      backend/core.js
  2. 115 104
      backend/logic/actions/stations.js
  3. 7 2
      backend/logic/io.js

+ 64 - 26
backend/core.js

@@ -12,8 +12,9 @@ class DeferredPromise {
 
 class QueueTask {
 	// eslint-disable-next-line require-jsdoc
-	constructor(job, priority) {
+	constructor(job, options, priority) {
 		this.job = job;
+		this.options = options;
 		this.priority = priority;
 		this.job.setTask(this);
 	}
@@ -78,10 +79,11 @@ class Queue {
 	 * Adds a job to the queue, with a given priority.
 	 *
 	 * @param {object} job - the job that is to be added
+	 * @param {object} options - custom options e.g. isQuiet. Optional.
 	 * @param {number} priority - the priority of the to be added job
 	 */
-	push(job, priority) {
-		this.queue.push(new QueueTask(job, priority));
+	push(job, options, priority) {
+		this.queue.push(new QueueTask(job, options, priority));
 		setTimeout(() => {
 			this._handleQueue();
 		}, 0);
@@ -159,7 +161,7 @@ class Queue {
 	 * @param {object} task - the task to be handled
 	 */
 	_handleTask(task) {
-		this.handleTaskFunction(task.job).finally(() => {
+		this.handleTaskFunction(task.job, task.options).finally(() => {
 			this.runningTasks.remove(task);
 			this._handleQueue();
 		});
@@ -206,6 +208,11 @@ class Job {
 		this.status = status;
 	}
 
+	/**
+	 * Sets the task for a job
+	 *
+	 * @param {string} task - the job task
+	 */
 	setTask(task) {
 		this.task = task;
 	}
@@ -280,7 +287,7 @@ export default class CoreClass {
 		this.name = name;
 		this.status = "UNINITIALIZED";
 		// this.log("Core constructor");
-		this.jobQueue = new Queue(job => this._runJob(job), 10);
+		this.jobQueue = new Queue((job, options) => this._runJob(job, options), 10);
 		this.jobQueue.pause();
 		this.runningJobs = [];
 		this.priorities = {};
@@ -407,48 +414,78 @@ export default class CoreClass {
 	 * @param {object} payload - any expected payload for the job itself
 	 * @param {object} parentJob - the parent job, if any
 	 * @param {number} priority - custom priority. Optional.
+	 * @param {object} options - custom options e.g. isQuiet. Optional.
 	 * @returns {Promise} - returns a promise
 	 */
-	runJob(name, payload, parentJob, priority) {
+	runJob(name, payload, parentJob, priority, options) {
+		/** Allows for any combination of optional parameters (parentJob, priority, options) */
+
+		let _options;
+		let _priority;
+		let _parentJob;
+
+		if (parentJob) {
+			if (typeof parentJob === "object")
+				if (!parentJob.name) _options = parentJob;
+				else _parentJob = parentJob;
+			else if (typeof parentJob === "number") _priority = parentJob;
+		}
+
+		if (options) {
+			if (typeof options === "object")
+				if (options.name) _parentJob = options;
+				else _options = options;
+			if (typeof options === "number") _priority = options;
+		}
+
+		if (priority && typeof priority === "object") {
+			if (!priority.name) _options = priority;
+			else _parentJob = priority;
+		} else _priority = priority;
+
+		if (!_options) _options = { isQuiet: false };
+
 		const deferredPromise = new DeferredPromise();
-		const job = new Job(name, payload, deferredPromise, this, parentJob);
+		const job = new Job(name, payload, deferredPromise, this, _parentJob);
+
 		this.log("INFO", `Queuing job ${name} (${job.toString()})`);
-		if (parentJob) {
-			parentJob.addChildJob(job);
-			if (parentJob.status === "RUNNING") {
+
+		if (_parentJob) {
+			_parentJob.addChildJob(job);
+			if (_parentJob.status === "RUNNING") {
 				this.log(
 					"INFO",
-					`Pausing job ${parentJob.name} (${parentJob.toString()}) since a child job has to run first`
+					`Pausing job ${_parentJob.name} (${_parentJob.toString()}) since a child job has to run first`
 				);
-				parentJob.setStatus("WAITING_ON_CHILD_JOB");
-				parentJob.module.jobQueue.pauseRunningJob(parentJob);
-				// console.log(111, parentJob.module.jobQueue.length());
+				_parentJob.setStatus("WAITING_ON_CHILD_JOB");
+				_parentJob.module.jobQueue.pauseRunningJob(_parentJob);
+				// console.log(111, _parentJob.module.jobQueue.length());
 				// console.log(
 				// 	222,
-				// 	parentJob.module.jobQueue.workersList().map(data => data.data.job)
+				// 	_parentJob.module.jobQueue.workersList().map(data => data.data.job)
 				// );
 			} else {
 				this.log(
 					"INFO",
-					`Not pausing job ${parentJob.name} (${parentJob.toString()}) since it's already paused`
+					`Not pausing job ${_parentJob.name} (${_parentJob.toString()}) since it's already paused`
 				);
 			}
 		}
 
 		// console.log(this);
 
-		// console.log(321, parentJob);
+		// console.log(321, _parentJob);
 
 		job.setStatus("QUEUED");
 
 		// if (options.bypassQueue) this._runJob(job, options, () => {});
 		// else {
-		const _priority = Math.min(
-			priority || Infinity,
-			parentJob ? parentJob.task.priority : Infinity,
+		const calculatedPriority = Math.min(
+			_priority || Infinity,
+			_parentJob ? _parentJob.task.priority : Infinity,
 			this.priorities[name] ? this.priorities[name] : 10
 		);
-		this.jobQueue.push(job, _priority);
+		this.jobQueue.push(job, _options, calculatedPriority);
 
 		if (
 			config.debug &&
@@ -480,10 +517,11 @@ export default class CoreClass {
 	 * @param {string} job.name - the name of the job e.g. GET_PLAYLIST
 	 * @param {string} job.payload - any expected payload for the job itself
 	 * @param {Promise} job.onFinish - deferred promise when the job is complete
+	 * @param {object} options - custom options e.g. isQuiet. Optional.
 	 * @returns {Promise} - returns a promise
 	 */
-	_runJob(job) {
-		this.log("INFO", `Running job ${job.name} (${job.toString()})`);
+	_runJob(job, options) {
+		if (!options.isQuiet) this.log("INFO", `Running job ${job.name} (${job.toString()})`);
 		return new Promise(resolve => {
 			const startTime = Date.now();
 
@@ -492,12 +530,11 @@ export default class CoreClass {
 			this.runningJobs.push(job);
 
 			if (previousStatus === "QUEUED") {
-				this.log("INFO", `Job ${job.name} (${job.toString()}) is queued, so calling it`);
+				if (!options.isQuiet) this.log("INFO", `Job ${job.name} (${job.toString()}) is queued, so calling it`);
 				this[job.name]
 					.apply(job, [job.payload])
 					.then(response => {
-						// if (!options.isQuiet)
-						this.log("INFO", `Ran job ${job.name} (${job.toString()}) successfully`);
+						if (!options.isQuiet) this.log("INFO", `Ran job ${job.name} (${job.toString()}) successfully`);
 						job.setStatus("FINISHED");
 						job.setResponse(response);
 						this.jobStatistics[job.name].successful += 1;
@@ -543,6 +580,7 @@ export default class CoreClass {
 						this.jobStatistics[job.name].total += 1;
 						this.jobStatistics[job.name].averageTiming.update(executionTime);
 						this.runningJobs.splice(this.runningJobs.indexOf(job), 1);
+
 						if (!job.parentJob) {
 							if (job.responseType === "RESOLVE") {
 								job.onFinish.resolve(job.response);

+ 115 - 104
backend/logic/actions/stations.js

@@ -15,112 +15,123 @@ const ActivitiesModule = moduleManager.modules.activities;
 const YouTubeModule = moduleManager.modules.youtube;
 
 const userList = {};
-const usersPerStation = {};
-const usersPerStationCount = {};
+let usersPerStation = {};
+let usersPerStationCount = {};
 
 // Temporarily disabled until the messages in console can be limited
-// setInterval(async () => {
-// 	const stationsCountUpdated = [];
-// 	const stationsUpdated = [];
-
-// 	const oldUsersPerStation = usersPerStation;
-// 	usersPerStation = {};
-
-// 	const oldUsersPerStationCount = usersPerStationCount;
-// 	usersPerStationCount = {};
-
-// 	const userModel = await DBModule.runJob("GET_MODEL", {
-// 		modelName: "user"
-// 	});
-
-// 	async.each(
-// 		Object.keys(userList),
-// 		(socketId, next) => {
-// 			IOModule.runJob("SOCKET_FROM_SESSION", { socketId }, { isQuiet: true }).then(socket => {
-// 				const stationId = userList[socketId];
-// 				if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
-// 					if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
-// 					if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
-// 					delete userList[socketId];
-// 					return next();
-// 				}
-// 				if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
-// 				usersPerStationCount[stationId] += 1;
-// 				if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
-
-// 				return async.waterfall(
-// 					[
-// 						next => {
-// 							if (!socket.session || !socket.session.sessionId) return next("No session found.");
-// 							return CacheModule
-// 								.runJob("HGET", {
-// 									table: "sessions",
-// 									key: socket.session.sessionId
-// 								})
-// 								.then(session => {
-// 									next(null, session);
-// 								})
-// 								.catch(next);
-// 						},
-
-// 						(session, next) => {
-// 							if (!session) return next("Session not found.");
-// 							return userModel.findOne({ _id: session.userId }, next);
-// 						},
-
-// 						(user, next) => {
-// 							if (!user) return next("User not found.");
-// 							if (usersPerStation[stationId].indexOf(user.username) !== -1)
-// 								return next("User already in the list.");
-// 							return next(null, user.username);
-// 						}
-// 					],
-// 					(err, username) => {
-// 						if (!err) {
-// 							usersPerStation[stationId].push(username);
-// 						}
-// 						next();
-// 					}
-// 				);
-// 			});
-// 			// TODO Code to show users
-// 		},
-// 		() => {
-//			Object.keys(usersPerStationCount).forEach(stationId => {
-//				if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
-//					if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
-//				}
-//			})
-//
-//			Object.keys(usersPerStation).forEach(stationId => {
-//				if (
-//					_.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 ||
-//					_.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0
-//				) {
-//					if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
-//				}
-//			});
-//
-// 			stationsCountUpdated.forEach(stationId => {
-// 				console.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
-// 				CacheModule.runJob("PUB", {
-// 					table: "station.updateUserCount",
-// 					value: stationId
-// 				});
-// 			});
-
-// 			stationsUpdated.forEach(stationId => {
-// 				console.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
-// 				CacheModule.runJob("PUB", {
-// 					table: "station.updateUsers",
-// 					value: stationId
-// 				});
-// 			});
-
-// 			// console.log("Userlist", usersPerStation);
-// 		}
-// 	);
-// }, 3000);
+setInterval(async () => {
+	const stationsCountUpdated = [];
+	const stationsUpdated = [];
+
+	// const oldUsersPerStation = usersPerStation;
+	usersPerStation = {};
+
+	const oldUsersPerStationCount = usersPerStationCount;
+	usersPerStationCount = {};
+
+	const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, { isQuiet: true });
+
+	async.each(
+		Object.keys(userList),
+		(socketId, next) => {
+			IOModule.runJob("SOCKET_FROM_SESSION", { socketId }, { isQuiet: true }).then(socket => {
+				const stationId = userList[socketId];
+
+				if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
+					if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
+					if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
+					delete userList[socketId];
+					return next();
+				}
+
+				if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
+				usersPerStationCount[stationId] += 1;
+				if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
+
+				return async.waterfall(
+					[
+						next => {
+							if (!socket.session || !socket.session.sessionId) return next("No session found.");
+							return CacheModule.runJob(
+								"HGET",
+								{
+									table: "sessions",
+									key: socket.session.sessionId
+								},
+								{ isQuiet: true }
+							)
+								.then(session => {
+									next(null, session);
+								})
+								.catch(next);
+						},
+
+						(session, next) => {
+							if (!session) return next("Session not found.");
+							return userModel.findOne({ _id: session.userId }, next);
+						},
+
+						(user, next) => {
+							if (!user) return next("User not found.");
+							if (usersPerStation[stationId].indexOf(user.username) !== -1)
+								return next("User already in the list.");
+							return next(null, { username: user.username, avatar: user.avatar });
+						}
+					],
+					(err, user) => {
+						if (!err) {
+							usersPerStation[stationId].push(user);
+						}
+						next();
+					}
+				);
+			});
+			// TODO Code to show users
+		},
+		() => {
+			Object.keys(usersPerStationCount).forEach(stationId => {
+				if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
+					if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
+				}
+			});
+
+			// Object.keys(usersPerStation).forEach(stationId => {
+			// 	if (
+			// 		_.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 ||
+			// 		_.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0
+			// 	) {
+			// 		if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
+			// 	}
+			// });
+
+			stationsCountUpdated.forEach(stationId => {
+				console.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
+				CacheModule.runJob(
+					"PUB",
+					{
+						table: "station.updateUserCount",
+						value: stationId
+					},
+					{ isQuiet: true }
+				);
+			});
+
+			stationsUpdated.forEach(stationId => {
+				console.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
+				CacheModule.runJob(
+					"PUB",
+					{
+						table: "station.updateUsers",
+						value: stationId
+					},
+					{ isQuiet: true }
+				);
+			});
+
+			// console.log("Userlist", usersPerStation);
+		}
+	);
+}, 3000);
 
 CacheModule.runJob("SUB", {
 	channel: "station.updateUsers",

+ 7 - 2
backend/logic/io.js

@@ -336,8 +336,13 @@ class _IOModule extends CoreClass {
 		});
 	}
 
-	// UNKNOWN
-	// eslint-disable-next-line require-jsdoc
+	/**
+	 * Returns whether there is a socket for a session id or not
+	 *
+	 * @param {object} payload - object containing the payload
+	 * @param {string} payload.sessionId - user session id
+	 * @returns {Promise} - returns promise (reject, resolve)
+	 */
 	async SOCKET_FROM_SESSION(payload) {
 		// socketId
 		return new Promise((resolve, reject) => {