Browse Source

feat: worked further on the long job system

Kristian Vos 1 year ago
parent
commit
a9b5c93cc0

+ 42 - 20
backend/core.js

@@ -191,6 +191,9 @@ class Job {
 		this.lastProgressData = null;
 		this.lastProgressTime = Date.now();
 		this.lastProgressTimeout = null;
+		this.longJob = false;
+		this.longJobTitle = "";
+		this.longJobStatus = "";
 	}
 
 	/**
@@ -264,31 +267,50 @@ class Job {
 		this.module.log(...args);
 	}
 
+	keepLongJob() {
+		this.longJob = true;
+	}
+
+	forgetLongJob() {
+		this.longJob = false;
+		this.module.moduleManager.jobManager.removeJob(this);
+	}
+
 	/**
 	 * 
 	 * @param {data} data - Data to publish upon progress
 	 */
-	publishProgress(data) {
-		if (this.onProgress) {
-			this.lastProgressData = data;
-			if (data.status === "update") {
-				if ((Date.now() - this.lastProgressTime) > 1000) {
-					this.lastProgressTime = Date.now();
+	publishProgress(data, notALongJob) {
+		if (this.longJob || notALongJob) {
+			if (this.onProgress) {
+				if (notALongJob) {
+					this.onProgress.emit("progress", data);
 				} else {
-					if (this.lastProgressTimeout) clearTimeout(this.lastProgressTimeout);
-					this.lastProgressTimeout = setTimeout(() => {
-						this.lastProgressTime = Date.now();
-						this.lastProgressTimeout = null;
-						this.onProgress.emit("progress", data);
-					}, Date.now() - this.lastProgressTime);
-					return;
+					this.lastProgressData = data;
+
+					if (data.status === "update") {
+						if ((Date.now() - this.lastProgressTime) > 1000) {
+							this.lastProgressTime = Date.now();
+						} else {
+							if (this.lastProgressTimeout) clearTimeout(this.lastProgressTimeout);
+							this.lastProgressTimeout = setTimeout(() => {
+								this.lastProgressTime = Date.now();
+								this.lastProgressTimeout = null;
+								this.onProgress.emit("progress", data);
+							}, Date.now() - this.lastProgressTime);
+							return;
+						}
+					} else if (data.status === "success" || data.status === "error")
+						if (this.lastProgressTimeout) clearTimeout(this.lastProgressTimeout);
+
+					if (data.title)	this.longJobTitle = data.title;
+
+					this.onProgress.emit("progress", data);
 				}
-			} else if (data.status === "success" || data.status === "error")
-				if (this.lastProgressTimeout) clearTimeout(this.lastProgressTimeout);
-			
-
-			this.onProgress.emit("progress", data);
-		} else this.log("Progress published, but no onProgress specified.")
+			} else this.log("Progress published, but no onProgress specified.")
+		} else {
+			this.parentJob.publishProgress(data);
+		}
 	}
 }
 
@@ -636,7 +658,7 @@ export default class CoreClass {
 							const executionTime = endTime - startTime;
 							this.jobStatistics[job.name].total += 1;
 							this.jobStatistics[job.name].averageTiming.update(executionTime);
-							this.moduleManager.jobManager.removeJob(job);
+							if (!job.longJob) this.moduleManager.jobManager.removeJob(job);
 							job.cleanup();
 
 							if (!job.parentJob) {

+ 9 - 8
backend/index.js

@@ -72,17 +72,17 @@ if (config.debug && config.debug.traceUnhandledPromises === true) {
 class JobManager {
 	// eslint-disable-next-line require-jsdoc
 	constructor() {
-		this.runningJobs = {};
+		this.jobs = {};
 	}
 
 	/**
-	 * Adds a job to the list of running jobs
+	 * Adds a job to the list of jobs
 	 *
 	 * @param {object} job - the job object
 	 */
 	addJob(job) {
-		if (!this.runningJobs[job.module.name]) this.runningJobs[job.module.name] = {};
-		this.runningJobs[job.module.name][job.toString()] = job;
+		if (!this.jobs[job.module.name]) this.jobs[job.module.name] = {};
+		this.jobs[job.module.name][job.toString()] = job;
 	}
 
 	/**
@@ -91,8 +91,9 @@ class JobManager {
 	 * @param {object} job - the job object
 	 */
 	removeJob(job) {
-		if (!this.runningJobs[job.module.name]) this.runningJobs[job.module.name] = {};
-		delete this.runningJobs[job.module.name][job.toString()];
+		return;
+		if (!this.jobs[job.module.name]) this.jobs[job.module.name] = {};
+		delete this.jobs[job.module.name][job.toString()];
 	}
 
 	/**
@@ -103,8 +104,8 @@ class JobManager {
 	 */
 	getJob(uuid) {
 		let job = null;
-		Object.keys(this.runningJobs).forEach(moduleName => {
-			if (this.runningJobs[moduleName][uuid]) job = this.runningJobs[moduleName][uuid];
+		Object.keys(this.jobs).forEach(moduleName => {
+			if (this.jobs[moduleName][uuid]) job = this.jobs[moduleName][uuid];
 		});
 		return job;
 	}

+ 5 - 1
backend/logic/actions/songs.js

@@ -1311,10 +1311,14 @@ export default {
 	editTags: isAdminRequired(async function editTags(session, method, tags, songIds, cb) {
 		const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
 
+		this.keepLongJob();
 		this.publishProgress({
 			status: "started",
-			message: "Updating tags."
+			title: "Bulk editing tags",
+			message: "Updating tags.",
+			id: this.toString()
 		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
 
 		async.waterfall(
 			[

+ 131 - 0
backend/logic/actions/users.js

@@ -1606,6 +1606,137 @@ export default {
 		);
 	},
 
+	/**
+	 * Updates the order of a user's playlists
+	 *
+	 * @param {object} session - the session object automatically added by the websocket
+	 * @param {Function} cb - gets called with the result
+	 */
+	getLongJobs: isLoginRequired(async function getLongJobs(session, cb) {
+		async.waterfall(
+			[
+				next => {
+					CacheModule.runJob(
+						"LRANGE",
+						{
+							key: `longJobs.${session.userId}`
+						},
+						this
+					)
+						.then(longJobUuids => next(null, longJobUuids))
+						.catch(next);
+				},
+
+				(longJobUuids, next) => {
+					next(
+						null,
+						longJobUuids
+							.map(longJobUuid => moduleManager.jobManager.getJob(longJobUuid))
+							.filter(longJob => !!longJob)
+					);
+				},
+
+				(longJobs, next) => {
+					longJobs.forEach(longJob => {
+						longJob.onProgress.on("progress", data => {
+							this.publishProgress(
+								{
+									id: longJob.toString(),
+									...data
+								},
+								true
+							);
+						});
+					});
+
+					next(
+						null,
+						longJobs.map(longJob => ({
+							id: longJob.toString(),
+							name: longJob.longJobTitle,
+							status: longJob.lastProgressData.status,
+							message: longJob.lastProgressData.message
+						}))
+					);
+				}
+			],
+			async (err, longJobs) => {
+				if (err) {
+					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
+
+					this.log("ERROR", "GET_LONG_JOBS", `Couldn't get long jobs for user "${session.userId}". "${err}"`);
+
+					return cb({ status: "error", message: err });
+				}
+
+				this.log("SUCCESS", "GET_LONG_JOBS", `Got long jobs for user "${session.userId}".`);
+
+				return cb({
+					status: "success",
+					data: {
+						longJobs
+					}
+				});
+			}
+		);
+	}),
+
+	/**
+	 * Updates the order of a user's playlists
+	 *
+	 * @param {object} session - the session object automatically added by the websocket
+	 * @param {string} jobId - array of playlist ids (with a specific order)
+	 * @param {Function} cb - gets called with the result
+	 */
+	removeLongJob: isLoginRequired(async function removeLongJob(session, jobId, cb) {
+		async.waterfall(
+			[
+				next => {
+					CacheModule.runJob(
+						"LREM",
+						{
+							key: `longJobs.${session.userId}`,
+							value: jobId
+						},
+						this
+					)
+						.then(() => next())
+						.catch(next);
+				},
+
+				next => {
+					const job = moduleManager.jobManager.getJob(jobId);
+					if (job && job.status === "FINISHED") job.forgetLongJob();
+					next();
+				}
+			],
+			async err => {
+				if (err) {
+					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
+
+					this.log(
+						"ERROR",
+						"REMOVE_LONG_JOB",
+						`Couldn't remove long job for user "${session.userId}" with id ${jobId}. "${err}"`
+					);
+
+					return cb({ status: "error", message: err });
+				}
+
+				this.log(
+					"SUCCESS",
+					"REMOVE_LONG_JOB",
+					`Removed long job for user "${session.userId}" with id ${jobId}.`
+				);
+
+				return cb({
+					status: "success",
+					message: "Removed long job successfully."
+				});
+			}
+		);
+	}),
+
 	/**
 	 * Gets a user from a userId
 	 *

+ 78 - 0
backend/logic/cache/index.js

@@ -331,6 +331,84 @@ class _CacheModule extends CoreClass {
 		});
 	}
 
+	/**
+	 * Gets a full list from Redis
+	 *
+	 * @param {object} payload - object containing payload
+	 * @param {string} payload.key - name of the table to get the value from (table === redis hash)
+	 * @returns {Promise} - returns a promise (resolve, reject)
+	 */
+	LRANGE(payload) {
+		return new Promise((resolve, reject) => {
+			let { key } = payload;
+
+			if (!key) {
+				reject(new Error("Invalid key!"));
+				return;
+			}
+			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
+
+			CacheModule.client.LRANGE(key, 0, -1, (err, list) => {
+				if (err) {
+					reject(new Error(err));
+					return;
+				}
+
+				resolve(list);
+			});
+		});
+	}
+
+	/**
+	 * Adds a value to a list in Redis
+	 *
+	 * @param {object} payload - object containing payload
+	 * @param {string} payload.key -  name of the list
+	 * @param {*} payload.value - the value we want to set
+	 * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
+	 * @returns {Promise} - returns a promise (resolve, reject)
+	 */
+	RPUSH(payload) {
+		return new Promise((resolve, reject) => {
+			let { key } = payload;
+			let { value } = payload;
+
+			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
+			// automatically stringify objects and arrays into JSON
+			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
+
+			CacheModule.client.RPUSH(key, value, err => {
+				if (err) return reject(new Error(err));
+				return resolve();
+			});
+		});
+	}
+
+	/**
+	 * Removes a value from a list in Redis
+	 *
+	 * @param {object} payload - object containing payload
+	 * @param {string} payload.key -  name of the list
+	 * @param {*} payload.value - the value we want to remove
+	 * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array
+	 * @returns {Promise} - returns a promise (resolve, reject)
+	 */
+	LREM(payload) {
+		return new Promise((resolve, reject) => {
+			let { key } = payload;
+			let { value } = payload;
+
+			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();
+			// automatically stringify objects and arrays into JSON
+			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);
+
+			CacheModule.client.LREM(key, 1, value, err => {
+				if (err) return reject(new Error(err));
+				return resolve();
+			});
+		});
+	}
+
 	/**
 	 * Returns a redis schema
 	 *

+ 24 - 4
frontend/src/components/LongJobs.vue

@@ -85,7 +85,7 @@
 </template>
 
 <script>
-import { mapState, mapActions } from "vuex";
+import { mapState, mapActions, mapGetters } from "vuex";
 
 import FloatingBox from "@/components/FloatingBox.vue";
 
@@ -102,14 +102,34 @@ export default {
 	computed: {
 		...mapState("longJobs", {
 			activeJobs: state => state.activeJobs
+		}),
+		...mapGetters({
+			socket: "websockets/getSocket"
 		})
 	},
+	mounted() {
+		this.socket.dispatch("users.getLongJobs", {
+			cb: res => {
+				if (res.status === "success") {
+					this.setJobs(res.data.longJobs);
+				} else console.log(res.message);
+			},
+			onProgress: res => {
+				this.setJob(res);
+			}
+		});
+	},
 	methods: {
 		remove(job) {
-			if (job.status === "success" || job.status === "error")
-				this.removeJob(job.id);
+			if (job.status === "success" || job.status === "error") {
+				this.socket.dispatch("users.removeLongJob", job.id, res => {
+					if (res.status === "success") {
+						this.removeJob(job.id);
+					} else console.log(res.message);
+				});
+			}
 		},
-		...mapActions("longJobs", ["removeJob"])
+		...mapActions("longJobs", ["setJob", "setJobs", "removeJob"])
 	}
 };
 </script>

+ 14 - 7
frontend/src/components/modals/BulkActions.vue

@@ -132,7 +132,8 @@ export default {
 		},
 		applyChanges() {
 			let toast;
-			const id = Date.now();
+			let id;
+			let title;
 
 			this.socket.dispatch(
 				this.type.action,
@@ -164,12 +165,18 @@ export default {
 							}, 4000);
 						}
 
-						if (res.status === "started") this.closeCurrentModal();
-						this.setJob({
-							id,
-							name: `Bulk ${this.method} ${this.type.name}`,
-							...res
-						});
+						if (res.status === "started") {
+							id = res.id;
+							title = res.title;
+							this.closeCurrentModal();
+						}
+
+						if (id)
+							this.setJob({
+								id,
+								name: title,
+								...res
+							});
 					}
 				}
 			);

+ 10 - 6
frontend/src/store/modules/longJobs.js

@@ -2,12 +2,12 @@
 
 const state = {
 	activeJobs: [
-		{
-			id: 1,
-			name: "test",
-			status: "success",
-			message: "test"
-		}
+		// {
+		// 	id: 1,
+		// 	name: "test",
+		// 	status: "success",
+		// 	message: "test"
+		// }
 	]
 };
 
@@ -15,6 +15,7 @@ const getters = {};
 
 const actions = {
 	setJob: ({ commit }, job) => commit("setJob", job),
+	setJobs: ({ commit }, jobs) => commit("setJobs", jobs),
 	removeJob: ({ commit }, job) => commit("removeJob", job)
 };
 
@@ -37,6 +38,9 @@ const mutations = {
 				}
 			});
 	},
+	setJobs(state, jobs) {
+		state.activeJobs = jobs;
+	},
 	removeJob(state, jobId) {
 		state.activeJobs.forEach((activeJob, index) => {
 			if (activeJob.id === jobId) {