Browse Source

feat: worked more on long jobs, they now get added/removed in other sessions too

Kristian Vos 2 years ago
parent
commit
0bfcd70dbf

+ 8 - 0
backend/logic/actions/songs.js

@@ -1319,6 +1319,14 @@ export default {
 			id: this.toString()
 		});
 		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
 
 		async.waterfall(
 			[

+ 114 - 2
backend/logic/actions/users.js

@@ -202,6 +202,39 @@ CacheModule.runJob("SUB", {
 	}
 });
 
+CacheModule.runJob("SUB", {
+	channel: "longJob.removed",
+	cb: ({ jobId, userId }) => {
+		WSModule.runJob("SOCKETS_FROM_USER", { userId }).then(sockets => {
+			sockets.forEach(socket => {
+				socket.dispatch("keep.event:longJob.removed", {
+					data: {
+						jobId
+					}
+				});
+			});
+		});
+	}
+});
+
+CacheModule.runJob("SUB", {
+	channel: "longJob.added",
+	cb: ({ jobId, userId }) => {
+		console.log(1111, jobId, userId);
+		WSModule.runJob("SOCKETS_FROM_USER", { userId }).then(sockets => {
+			console.log(2222, sockets.length);
+			sockets.forEach(socket => {
+				console.log(3333);
+				socket.dispatch("keep.event:longJob.added", {
+					data: {
+						jobId
+					}
+				});
+			});
+		});
+	}
+});
+
 export default {
 	/**
 	 * Gets users, used in the admin users page by the AdvancedTable component
@@ -1607,7 +1640,7 @@ export default {
 	},
 
 	/**
-	 * Updates the order of a user's playlists
+	 * Gets a list of long jobs, including onprogress events when those long jobs have progress
 	 *
 	 * @param {object} session - the session object automatically added by the websocket
 	 * @param {Function} cb - gets called with the result
@@ -1683,7 +1716,81 @@ export default {
 	}),
 
 	/**
-	 * Updates the order of a user's playlists
+	 * Gets a specific long job, including onprogress events when that long job has progress
+	 *
+	 * @param {object} session - the session object automatically added by the websocket
+	 * @param {string} jobId - the if id the long job
+	 * @param {Function} cb - gets called with the result
+	 */
+	getLongJob: isLoginRequired(async function getLongJobs(session, jobId, cb) {
+		async.waterfall(
+			[
+				next => {
+					CacheModule.runJob(
+						"LRANGE",
+						{
+							key: `longJobs.${session.userId}`
+						},
+						this
+					)
+						.then(longJobUuids => next(null, longJobUuids))
+						.catch(next);
+				},
+
+				(longJobUuids, next) => {
+					if (longJobUuids.indexOf(jobId) === -1) return next("Long job not found.");
+					const longJob = moduleManager.jobManager.getJob(jobId);
+					if (!longJob) return next("Long job not found.");
+					return next(null, longJob);
+				},
+
+				(longJob, next) => {
+					if (longJob.onProgress)
+						longJob.onProgress.on("progress", data => {
+							this.publishProgress(
+								{
+									id: longJob.toString(),
+									...data
+								},
+								true
+							);
+						});
+
+					next(null, {
+						id: longJob.toString(),
+						name: longJob.longJobTitle,
+						status: longJob.lastProgressData.status,
+						message: longJob.lastProgressData.message
+					});
+				}
+			],
+			async (err, longJob) => {
+				if (err) {
+					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
+
+					this.log(
+						"ERROR",
+						"GET_LONG_JOB",
+						`Couldn't get long job for user "${session.userId}" with id "${jobId}". "${err}"`
+					);
+
+					return cb({ status: "error", message: err });
+				}
+
+				this.log("SUCCESS", "GET_LONG_JOB", `Got long job for user "${session.userId}" with id "${jobId}".`);
+
+				return cb({
+					status: "success",
+					data: {
+						longJob
+					}
+				});
+			}
+		);
+	}),
+
+	/**
+	 * Removes active long job for a user
 	 *
 	 * @param {object} session - the session object automatically added by the websocket
 	 * @param {string} jobId - array of playlist ids (with a specific order)
@@ -1730,6 +1837,11 @@ export default {
 					`Removed long job for user "${session.userId}" with id ${jobId}.`
 				);
 
+				CacheModule.runJob("PUB", {
+					channel: "longJob.removed",
+					value: { jobId, userId: session.userId }
+				});
+
 				return cb({
 					status: "success",
 					message: "Removed long job successfully."

+ 8 - 0
backend/logic/actions/youtube.js

@@ -368,6 +368,14 @@ export default {
 			id: this.toString()
 		});
 		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
 
 		async.waterfall(
 			[

+ 18 - 0
frontend/src/components/LongJobs.vue

@@ -118,6 +118,24 @@ export default {
 				this.setJob(res);
 			}
 		});
+
+		this.socket.on("keep.event:longJob.removed", ({ data }) => {
+			this.removeJob(data.jobId);
+		});
+
+		this.socket.on("keep.event:longJob.added", ({ data }) => {
+			if (!this.activeJobs.find(activeJob => activeJob.id === data.jobId))
+				this.socket.dispatch("users.getLongJob", data.jobId, {
+					cb: res => {
+						if (res.status === "success") {
+							this.setJob(res.data.longJob);
+						} else console.log(res.message);
+					},
+					onProgress: res => {
+						this.setJob(res);
+					}
+				});
+		});
 	},
 	methods: {
 		remove(job) {

+ 20 - 17
frontend/src/store/modules/longJobs.js

@@ -8,7 +8,8 @@ const state = {
 		// 	status: "success",
 		// 	message: "test"
 		// }
-	]
+	],
+	removedJobIds: []
 };
 
 const getters = {};
@@ -21,22 +22,23 @@ const actions = {
 
 const mutations = {
 	setJob(state, { id, name, status, message }) {
-		if (status === "started")
-			state.activeJobs.push({
-				id,
-				name,
-				status
-			});
-		else
-			state.activeJobs.forEach((activeJob, index) => {
-				if (activeJob.id === id) {
-					state.activeJobs[index] = {
-						...state.activeJobs[index],
-						status,
-						message
-					};
-				}
-			});
+		if (state.removedJobIds.indexOf(id) === -1)
+			if (!state.activeJobs.find(activeJob => activeJob.id === id))
+				state.activeJobs.push({
+					id,
+					name,
+					status
+				});
+			else
+				state.activeJobs.forEach((activeJob, index) => {
+					if (activeJob.id === id) {
+						state.activeJobs[index] = {
+							...state.activeJobs[index],
+							status,
+							message
+						};
+					}
+				});
 	},
 	setJobs(state, jobs) {
 		state.activeJobs = jobs;
@@ -45,6 +47,7 @@ const mutations = {
 		state.activeJobs.forEach((activeJob, index) => {
 			if (activeJob.id === jobId) {
 				state.activeJobs.splice(index, 1);
+				state.removedJobIds.push(jobId);
 			}
 		});
 	}