Browse Source

Many more backend fixes

Kristian Vos 4 years ago
parent
commit
6d85786ce6

+ 47 - 22
backend/core.js

@@ -15,6 +15,7 @@ class QueueTask {
 	constructor(job, priority) {
 		this.job = job;
 		this.priority = priority;
+		this.job.setTask(this);
 	}
 }
 
@@ -41,7 +42,9 @@ class Queue {
 	 */
 	resume() {
 		this.paused = false;
-		this._handleQueue();
+		setTimeout(() => {
+			this._handleQueue();
+		}, 0);
 	}
 
 	/**
@@ -113,14 +116,18 @@ class Queue {
 	 * Check if there's room for a job to be processed, and if there is, run it.
 	 */
 	_handleQueue() {
-		if (!this.paused && this.runningTasks.length < this.concurrency && this.queue.length > 0) {
+		if (this.queue.length > 0) {
 			const task = this.queue.reduce((a, b) => (a.priority < b.priority ? b : a));
-			this.queue.remove(task);
-			this.runningTasks.push(task);
-			this._handleTask(task);
-			setTimeout(() => {
-				this._handleQueue();
-			}, 0);
+			if (task) {
+				if ((!this.paused && this.runningTasks.length < this.concurrency) || task.priority === -1) {
+					this.queue.remove(task);
+					this.runningTasks.push(task);
+					this._handleTask(task);
+					setTimeout(() => {
+						this._handleQueue();
+					}, 0);
+				}
+			}
 		}
 	}
 
@@ -155,6 +162,7 @@ class Job {
 			return v.toString(16);
 		});
 		this.status = "INITIALIZED";
+		this.task = null;
 	}
 
 	/**
@@ -176,6 +184,10 @@ class Job {
 		this.status = status;
 	}
 
+	setTask(task) {
+		this.task = task;
+	}
+
 	/**
 	 * Returns the UUID of the job, allowing you to compare jobs with toString
 	 *
@@ -372,25 +384,33 @@ export default class CoreClass {
 	 * @param {string} name - the name of the job e.g. GET_PLAYLIST
 	 * @param {object} payload - any expected payload for the job itself
 	 * @param {object} parentJob - the parent job, if any
+	 * @param {number} priority - custom priority. Optional.
 	 * @returns {Promise} - returns a promise
 	 */
-	runJob(name, payload, parentJob) {
+	runJob(name, payload, parentJob, priority) {
 		const deferredPromise = new DeferredPromise();
 		const job = new Job(name, payload, deferredPromise, this, parentJob);
 		this.log("INFO", `Queuing job ${name} (${job.toString()})`);
 		if (parentJob) {
-			this.log(
-				"INFO",
-				`Pausing job ${parentJob.name} (${parentJob.toString()}) since a child job has to run first`
-			);
 			parentJob.addChildJob(job);
-			parentJob.setStatus("WAITING_ON_CHILD_JOB");
-			parentJob.module.jobQueue.pauseRunningJob(parentJob);
-			// console.log(111, parentJob.module.jobQueue.length());
-			// console.log(
-			// 	222,
-			// 	parentJob.module.jobQueue.workersList().map(data => data.data.job)
-			// );
+			if (parentJob.status === "RUNNING") {
+				this.log(
+					"INFO",
+					`Pausing job ${parentJob.name} (${parentJob.toString()}) since a child job has to run first`
+				);
+				parentJob.setStatus("WAITING_ON_CHILD_JOB");
+				parentJob.module.jobQueue.pauseRunningJob(parentJob);
+				// console.log(111, parentJob.module.jobQueue.length());
+				// console.log(
+				// 	222,
+				// 	parentJob.module.jobQueue.workersList().map(data => data.data.job)
+				// );
+			} else {
+				this.log(
+					"INFO",
+					`Not pausing job ${parentJob.name} (${parentJob.toString()}) since it's already paused`
+				);
+			}
 		}
 
 		// console.log(this);
@@ -410,8 +430,13 @@ export default class CoreClass {
 
 		// if (options.bypassQueue) this._runJob(job, options, () => {});
 		// else {
-		const priority = this.priorities[name] ? this.priorities[name] : 10;
-		this.jobQueue.push(job, priority);
+		const _priority = Math.min(
+			priority || Infinity,
+			parentJob ? parentJob.task.priority : Infinity,
+			this.priorities[name] ? this.priorities[name] : 10
+		);
+		console.log(_priority);
+		this.jobQueue.push(job, _priority);
 		// }
 
 		return deferredPromise.promise;

+ 4 - 2
backend/index.js

@@ -328,7 +328,8 @@ class ModuleManager {
 
 			this.log(
 				"INFO",
-				`Initialized: ${Object.keys(this.modules).length - this.modulesNotInitialized.length}/${Object.keys(this.modules).length
+				`Initialized: ${Object.keys(this.modules).length - this.modulesNotInitialized.length}/${
+					Object.keys(this.modules).length
 				}.`
 			);
 
@@ -426,7 +427,8 @@ process.stdin.on("data", data => {
 			console.log(
 				`${moduleName.toUpperCase()}${Array(tabsNeeded).join(
 					"\t"
-				)}${module.getStatus()}. Jobs in queue: ${module.jobQueue.lengthQueue()}. Jobs in progress: ${module.jobQueue.lengthRunning()}. Concurrency: ${module.jobQueue.concurrency
+				)}${module.getStatus()}. Jobs in queue: ${module.jobQueue.lengthQueue()}. Jobs in progress: ${module.jobQueue.lengthRunning()}. Concurrency: ${
+					module.jobQueue.concurrency
 				}. Stage: ${module.getStage()}`
 			);
 		});

+ 1 - 1
backend/logic/actions/hooks/loginRequired.js

@@ -29,7 +29,7 @@ export default destination => (session, ...args) => {
 				console.log("LOGIN_REQUIRED", `User failed to pass login required check.`);
 				return cb({ status: "failure", message: err });
 			}
-			console.log("LOGIN_REQUIRED", `User "${session.userId}" passed login required check.`, false);
+			console.log("LOGIN_REQUIRED", `User "${session.userId}" passed login required check.`);
 			return destination(session, ...args);
 		}
 	);

+ 0 - 3
backend/logic/actions/stations.js

@@ -1,5 +1,4 @@
 import async from "async";
-import underscore from "underscore";
 
 import { isLoginRequired, isOwnerRequired } from "./hooks";
 import db from "../db";
@@ -11,8 +10,6 @@ import notifications from "../notifications";
 import stations from "../stations";
 import activities from "../activities";
 
-const { _ } = underscore;
-
 // const logger = moduleManager.modules["logger"];
 
 const userList = {};

+ 4 - 2
backend/logic/actions/users.js

@@ -118,7 +118,7 @@ cache.runJob("SUB", {
 	}
 });
 
-export default {
+const thisExport = {
 	/**
 	 * Lists all Users
 	 *
@@ -392,7 +392,7 @@ export default {
 					return cb({ status: "failure", message: err });
 				}
 
-				return module.exports.login(session, email, password, result => {
+				return thisExport.login(session, email, password, result => {
 					const obj = {
 						status: "success",
 						message: "Successfully registered."
@@ -1897,3 +1897,5 @@ export default {
 		);
 	})
 };
+
+export default thisExport;

+ 3 - 1
backend/logic/cache/index.js

@@ -138,9 +138,11 @@ class _CacheModule extends CoreClass {
 		return new Promise((resolve, reject) => {
 			let { key } = payload;
 
+			if (!key) return reject(new Error("Invalid key!"));
+			if (!payload.table) return reject(new Error("Invalid table!"));
 			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
 
-			CacheModule.client.hget(payload.table, key, (err, value) => {
+			return CacheModule.client.hget(payload.table, key, (err, value) => {
 				if (err) return reject(new Error(err));
 				try {
 					value = JSON.parse(value);

+ 49 - 41
backend/logic/io.js

@@ -82,9 +82,11 @@ class _IOModule extends CoreClass {
 						},
 
 						next => {
-							CacheModule.runJob("HGET", { table: "sessions", key: SID }).then(session => {
-								next(null, session);
-							});
+							CacheModule.runJob("HGET", { table: "sessions", key: SID })
+								.then(session => {
+									next(null, session);
+								})
+								.catch(next);
 						},
 
 						(session, next) => {
@@ -230,7 +232,9 @@ class _IOModule extends CoreClass {
 								});
 							} else socket.emit("ready", false);
 						})
-						.catch(() => socket.emit("ready", false));
+						.catch(() => {
+							socket.emit("ready", false);
+						});
 				} else socket.emit("ready", false);
 
 				// have the socket listen for each action
@@ -259,52 +263,56 @@ class _IOModule extends CoreClass {
 							}
 							this.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
 
+							let failedGettingSession = false;
 							// load the session from the cache
-							CacheModule.runJob("HGET", {
-								table: "sessions",
-								key: socket.session.sessionId
-							})
-								.then(session => {
-									// make sure the sockets sessionId isn't set if there is no session
-									if (socket.session.sessionId && session === null) delete socket.session.sessionId;
-
-									try {
-										// call the action, passing it the session, and the arguments socket.io passed us
-										return actions[namespace][action].apply(
-											null,
-											[socket.session].concat(args).concat([
-												result => {
-													this.log(
-														"INFO",
-														"IO_ACTION",
-														`Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
-													);
-													// respond to the socket with our message
-													if (typeof cb === "function") cb(result);
-												}
-											])
-										);
-									} catch (err) {
+							if (socket.session.sessionId)
+								await CacheModule.runJob("HGET", {
+									table: "sessions",
+									key: socket.session.sessionId
+								})
+									.then(session => {
+										// make sure the sockets sessionId isn't set if there is no session
+										if (socket.session.sessionId && session === null)
+											delete socket.session.sessionId;
+									})
+									.catch(() => {
+										failedGettingSession = true;
 										if (typeof cb === "function")
 											cb({
 												status: "error",
-												message: "An error occurred while executing the specified action."
+												message: "An error occurred while obtaining your session"
 											});
-
-										return this.log(
-											"ERROR",
-											"IO_ACTION_ERROR",
-											`Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
-										);
-									}
-								})
-								.catch(() => {
+									});
+							if (!failedGettingSession)
+								try {
+									// call the action, passing it the session, and the arguments socket.io passed us
+									actions[namespace][action].apply(
+										null,
+										[socket.session].concat(args).concat([
+											result => {
+												this.log(
+													"INFO",
+													"IO_ACTION",
+													`Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
+												);
+												// respond to the socket with our message
+												if (typeof cb === "function") cb(result);
+											}
+										])
+									);
+								} catch (err) {
 									if (typeof cb === "function")
 										cb({
 											status: "error",
-											message: "An error occurred while obtaining your session"
+											message: "An error occurred while executing the specified action."
 										});
-								});
+
+									this.log(
+										"ERROR",
+										"IO_ACTION_ERROR",
+										`Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
+									);
+								}
 						});
 					});
 				});

+ 19 - 38
backend/logic/stations.js

@@ -154,15 +154,15 @@ class _StationsModule extends CoreClass {
 											StationsModule.runJob(
 												"INITIALIZE_STATION",
 												{
-													stationId: station._id,
-													bypassQueue: true
+													stationId: station._id
 												},
-												{ bypassQueue: true }
+												null,
+												-1
 											)
 												.then(() => {
 													next();
 												})
-												.catch(next); // bypassQueue is true because otherwise the module will never initialize
+												.catch(next);
 										}
 									],
 									err => {
@@ -193,7 +193,6 @@ class _StationsModule extends CoreClass {
 	 *
 	 * @param {object} payload - object that contains the payload
 	 * @param {string} payload.stationId - id of the station to initialise
-	 * @param {boolean} payload.bypassQueue - UNKNOWN
 	 * @returns {Promise} - returns a promise (resolve, reject)
 	 */
 	INITIALIZE_STATION(payload) {
@@ -206,10 +205,8 @@ class _StationsModule extends CoreClass {
 						StationsModule.runJob(
 							"GET_STATION",
 							{
-								stationId: payload.stationId,
-								bypassQueue: payload.bypassQueue
+								stationId: payload.stationId
 							},
-							{ bypassQueue: payload.bypassQueue },
 							this
 						)
 							.then(station => {
@@ -259,8 +256,7 @@ class _StationsModule extends CoreClass {
 							return StationsModule.runJob(
 								"SKIP_STATION",
 								{
-									stationId: station._id,
-									bypassQueue: payload.bypassQueue
+									stationId: station._id
 								},
 								this
 							)
@@ -280,8 +276,7 @@ class _StationsModule extends CoreClass {
 							return StationsModule.runJob(
 								"SKIP_STATION",
 								{
-									stationId: station._id,
-									bypassQueue: payload.bypassQueue
+									stationId: station._id
 								},
 								this
 							)
@@ -325,12 +320,10 @@ class _StationsModule extends CoreClass {
 	 *
 	 * @param {object} payload - object that contains the payload
 	 * @param {string} payload.station - station object to calculate song for
-	 * @param {boolean} payload.bypassQueue - UNKNOWN
 	 * @returns {Promise} - returns a promise (resolve, reject)
 	 */
 	async CALCULATE_SONG_FOR_STATION(payload) {
 		// station, bypassValidate = false
-		const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
 		const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
 
 		return new Promise((resolve, reject) => {
@@ -388,8 +381,7 @@ class _StationsModule extends CoreClass {
 							"CALCULATE_OFFICIAL_PLAYLIST_LIST",
 							{
 								stationId: payload.station._id,
-								songList: playlist,
-								bypassQueue: payload.bypassQueue
+								songList: playlist
 							},
 							this
 						)
@@ -400,7 +392,7 @@ class _StationsModule extends CoreClass {
 					},
 
 					(playlist, next) => {
-						stationModel.updateOne(
+						StationsModule.stationModel.updateOne(
 							{ _id: payload.station._id },
 							{ $set: { playlist } },
 							{ runValidators: true },
@@ -408,8 +400,7 @@ class _StationsModule extends CoreClass {
 								StationsModule.runJob(
 									"UPDATE_STATION",
 									{
-										stationId: payload.station._id,
-										bypassQueue: payload.bypassQueue
+										stationId: payload.station._id
 									},
 									this
 								)
@@ -457,7 +448,7 @@ class _StationsModule extends CoreClass {
 
 					(station, next) => {
 						if (station) return next(true, station);
-						return this.stationModel.findOne({ _id: payload.stationId }, next);
+						return StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
 					},
 
 					(station, next) => {
@@ -506,13 +497,11 @@ class _StationsModule extends CoreClass {
 	 * @returns {Promise} - returns a promise (resolve, reject)
 	 */
 	async GET_STATION_BY_NAME(payload) {
-		const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
-
 		return new Promise((resolve, reject) =>
 			async.waterfall(
 				[
 					next => {
-						stationModel.findOne({ name: payload.stationName }, next);
+						StationsModule.stationModel.findOne({ name: payload.stationName }, next);
 					},
 
 					(station, next) => {
@@ -555,7 +544,7 @@ class _StationsModule extends CoreClass {
 			async.waterfall(
 				[
 					next => {
-						this.stationModel.findOne({ _id: payload.stationId }, next);
+						StationsModule.stationModel.findOne({ _id: payload.stationId }, next);
 					},
 
 					(station, next) => {
@@ -662,7 +651,6 @@ class _StationsModule extends CoreClass {
 	 *
 	 * @param {object} payload - object that contains the payload
 	 * @param {string} payload.stationId - the id of the station to skip
-	 * @param {boolean} payload.bypassQueue - UNKNOWN
 	 * @returns {Promise} - returns a promise (resolve, reject)
 	 */
 	SKIP_STATION(payload) {
@@ -677,8 +665,7 @@ class _StationsModule extends CoreClass {
 						StationsModule.runJob(
 							"GET_STATION",
 							{
-								stationId: payload.stationId,
-								bypassQueue: payload.bypassQueue
+								stationId: payload.stationId
 							},
 							this
 						)
@@ -698,7 +685,7 @@ class _StationsModule extends CoreClass {
 							// Community station with party mode enabled and songs in the queue
 							if (station.paused) return next(null, null, -19, station);
 
-							return this.stationModel.updateOne(
+							return StationsModule.stationModel.updateOne(
 								{ _id: payload.stationId },
 								{
 									$pull: {
@@ -772,11 +759,7 @@ class _StationsModule extends CoreClass {
 						}
 
 						if (station.type === "official" && station.playlist.length === 0) {
-							return StationsModule.runJob(
-								"CALCULATE_SONG_FOR_STATION",
-								{ station, bypassQueue: payload.bypassQueue },
-								this
-							)
+							return StationsModule.runJob("CALCULATE_SONG_FOR_STATION", { station }, this)
 								.then(playlist => {
 									if (playlist.length === 0) return next(null, this.defaultSong, 0, station);
 
@@ -815,8 +798,7 @@ class _StationsModule extends CoreClass {
 										StationsModule.runJob(
 											"CALCULATE_SONG_FOR_STATION",
 											{
-												station,
-												bypassQueue: payload.bypassQueue
+												station
 											},
 											this
 										)
@@ -873,12 +855,11 @@ class _StationsModule extends CoreClass {
 					},
 
 					($set, station, next) => {
-						this.stationModel.updateOne({ _id: station._id }, { $set }, () => {
+						StationsModule.stationModel.updateOne({ _id: station._id }, { $set }, () => {
 							StationsModule.runJob(
 								"UPDATE_STATION",
 								{
-									stationId: station._id,
-									bypassQueue: payload.bypassQueue
+									stationId: station._id
 								},
 								this
 							)

+ 0 - 4
backend/logic/utils.js

@@ -133,10 +133,6 @@ class _UtilsModule extends CoreClass {
 
 		const promises = [];
 		for (let i = 0; i < payload.length; i += 1) {
-			this.log(
-				"ERROR",
-				"CHECK THIS!?!?!?!??!?!?!?!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
-			);
 			promises.push(
 				UtilsModule.runJob(
 					"GET_RANDOM_NUMBER",