Browse Source

feat: started working on long processes/actions with updates to the user

Kristian Vos 2 years ago
parent
commit
0a83d3db28

+ 40 - 2
backend/core.js

@@ -1,4 +1,5 @@
 import config from "config";
+import { EventEmitter } from "events";
 
 class DeferredPromise {
 	// eslint-disable-next-line require-jsdoc
@@ -169,7 +170,7 @@ class Queue {
 
 class Job {
 	// eslint-disable-next-line require-jsdoc
-	constructor(name, payload, onFinish, module, parentJob) {
+	constructor(name, payload, onFinish, module, parentJob, options) {
 		this.name = name;
 		this.payload = payload;
 		this.response = null;
@@ -186,6 +187,10 @@ class Job {
 		});
 		this.status = "INITIALIZED";
 		this.task = null;
+		this.onProgress = options.onProgress;
+		this.lastProgressData = null;
+		this.lastProgressTime = Date.now();
+		this.lastProgressTimeout = null;
 	}
 
 	/**
@@ -258,6 +263,33 @@ class Job {
 		args.splice(1, 0, this.name); // Adds the name of the job as the first argument (after INFO/SUCCESS/ERROR).
 		this.module.log(...args);
 	}
+
+	/**
+	 * 
+	 * @param {data} data - Data to publish upon progress
+	 */
+	publishProgress(data) {
+		if (this.onProgress) {
+			this.lastProgressData = data;
+			if (data.status === "update") {
+				if ((Date.now() - this.lastProgressTime) > 1000) {
+					this.lastProgressTime = Date.now();
+				} else {
+					if (this.lastProgressTimeout) clearTimeout(this.lastProgressTimeout);
+					this.lastProgressTimeout = setTimeout(() => {
+						this.lastProgressTime = Date.now();
+						this.lastProgressTimeout = null;
+						this.onProgress.emit("progress", data);
+					}, Date.now() - this.lastProgressTime);
+					return;
+				}
+			} else if (data.status === "success" || data.status === "error")
+				if (this.lastProgressTimeout) clearTimeout(this.lastProgressTimeout);
+			
+
+			this.onProgress.emit("progress", data);
+		} else this.log("Progress published, but no onProgress specified.")
+	}
 }
 
 class MovingAverageCalculator {
@@ -475,9 +507,15 @@ export default class CoreClass {
 		} else _priority = priority;
 
 		if (!_options) _options = { isQuiet: false };
+		if (_options && typeof _options.onProgress === "function") {
+			const onProgress = new EventEmitter();
+			onProgress.on("progress", _options.onProgress);
+			_options.onProgress = onProgress;
+		}
+		if (!_options.onProgress && parentJob) _options.onProgress = parentJob.onProgress;
 
 		const deferredPromise = new DeferredPromise();
-		const job = new Job(name, payload, deferredPromise, this, _parentJob);
+		const job = new Job(name, payload, deferredPromise, this, _parentJob, { onProgress: _options.onProgress });
 
 		this.log("INFO", `Queuing job ${name} (${job.toString()})`);
 

+ 32 - 2
backend/logic/actions/songs.js

@@ -1310,6 +1310,12 @@ export default {
 	 */
 	editTags: isAdminRequired(async function editTags(session, method, tags, songIds, cb) {
 		const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
+
+		this.publishProgress({
+			status: "started",
+			message: "Updating tags."
+		});
+
 		async.waterfall(
 			[
 				next => {
@@ -1335,13 +1341,29 @@ export default {
 						return;
 					}
 
+					this.publishProgress({
+						status: "update",
+						message: "Updating tags in MongoDB."
+					});
 					songModel.updateMany({ _id: { $in: songsFound } }, query, { runValidators: true }, err => {
 						if (err) {
 							next(err);
 							return;
 						}
-						SongsModule.runJob("UPDATE_SONGS", { songIds: songsFound });
-						next();
+
+						SongsModule.runJob(
+							"UPDATE_SONGS",
+							{
+								songIds: songsFound
+							},
+							this
+						)
+							.then(() => {
+								next();
+							})
+							.catch(() => {
+								next();
+							});
 					});
 				}
 			],
@@ -1349,9 +1371,17 @@ export default {
 				if (err && err !== true) {
 					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
 					this.log("ERROR", "EDIT_TAGS", `User ${session.userId} failed to edit tags. '${err}'`);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					cb({ status: "error", message: err });
 				} else {
 					this.log("SUCCESS", "EDIT_TAGS", `User ${session.userId} has successfully edited tags.`);
+					this.publishProgress({
+						status: "success",
+						message: "Successfully edited tags."
+					});
 					cb({
 						status: "success",
 						message: "Successfully edited tags."

+ 18 - 0
backend/logic/songs.js

@@ -520,6 +520,8 @@ class _SongsModule extends CoreClass {
 					next => {
 						const { songIds } = payload;
 
+						this.publishProgress({ status: "update", message: `Updating songs (stage 1)` });
+
 						SongsModule.SongModel.find({ _id: songIds }, next);
 					},
 
@@ -527,6 +529,8 @@ class _SongsModule extends CoreClass {
 					(songs, next) => {
 						const { songIds } = payload;
 
+						this.publishProgress({ status: "update", message: `Updating songs (stage 2)` });
+
 						async.eachLimit(
 							songIds,
 							1,
@@ -548,6 +552,8 @@ class _SongsModule extends CoreClass {
 
 					// Adds/updates all songs in the cache
 					(songs, next) => {
+						this.publishProgress({ status: "update", message: `Updating songs (stage 3)` });
+
 						async.eachLimit(
 							songs,
 							1,
@@ -574,6 +580,8 @@ class _SongsModule extends CoreClass {
 
 					// Updates all playlists that the songs are in by setting the new trimmed song
 					(songs, next) => {
+						this.publishProgress({ status: "update", message: `Updating songs (stage 4)` });
+
 						const trimmedSongs = songs.map(song => {
 							const { _id, youtubeId, title, artists, thumbnail, duration, verified } = song;
 							return {
@@ -626,6 +634,8 @@ class _SongsModule extends CoreClass {
 
 					// Updates all playlists that the songs are in
 					(songs, playlistsToUpdate, next) => {
+						this.publishProgress({ status: "update", message: `Updating songs (stage 5)` });
+
 						async.eachLimit(
 							playlistsToUpdate,
 							1,
@@ -652,6 +662,8 @@ class _SongsModule extends CoreClass {
 
 					// Updates all station queues that the songs are in by setting the new trimmed song
 					(songs, next) => {
+						this.publishProgress({ status: "update", message: `Updating songs (stage 6)` });
+
 						const stationsToUpdate = new Set();
 
 						async.eachLimit(
@@ -701,6 +713,8 @@ class _SongsModule extends CoreClass {
 
 					// Updates all playlists that the songs are in
 					(songs, stationsToUpdate, next) => {
+						this.publishProgress({ status: "update", message: `Updating songs (stage 7)` });
+
 						async.eachLimit(
 							stationsToUpdate,
 							1,
@@ -727,6 +741,8 @@ class _SongsModule extends CoreClass {
 
 					// Autofill the genre playlists of all genres of all songs
 					(songs, next) => {
+						this.publishProgress({ status: "update", message: `Updating songs (stage 8)` });
+
 						const genresToAutofill = new Set();
 
 						songs.forEach(song => {
@@ -754,6 +770,8 @@ class _SongsModule extends CoreClass {
 
 					// Send event that the song was updated
 					(songs, next) => {
+						this.publishProgress({ status: "update", message: `Updating songs (stage 9)` });
+
 						async.eachLimit(
 							songs,
 							1,

+ 10 - 7
backend/logic/ws.js

@@ -458,7 +458,7 @@ class _WSModule extends CoreClass {
 			const { socket, req } = payload;
 			let SID = "";
 
-			socket.ip = req.headers["x-forwarded-for"] || "0..0.0";
+			socket.ip = req.headers["x-forwarded-for"] || "0.0.0.0";
 
 			async.waterfall(
 				[
@@ -638,13 +638,16 @@ class _WSModule extends CoreClass {
 				if (!WSModule.actions[namespace][action]) return socket.dispatch("ERROR", "Action not found.");
 
 				if (data[data.length - 1].CB_REF) {
-					const { CB_REF } = data[data.length - 1];
+					const { CB_REF, onProgress } = data[data.length - 1];
 					data.pop();
 
-					return socket.actions.emit(data.shift(0), [...data, res => socket.dispatch("CB_REF", CB_REF, res)]);
+					return socket.actions.emit(data.shift(0), {
+						args: [...data, res => socket.dispatch("CB_REF", CB_REF, res)],
+						onProgress: onProgress ? res => socket.dispatch("PROGRESS_CB_REF", CB_REF, res) : null
+					});
 				}
 
-				return socket.actions.emit(data.shift(0), data);
+				return socket.actions.emit(data.shift(0), { args: data });
 			};
 
 			// have the socket listen for each action
@@ -654,9 +657,9 @@ class _WSModule extends CoreClass {
 					const name = `${namespace}.${action}`;
 
 					// listen for this action to be called
-					socket.listen(name, async args =>
-						WSModule.runJob("RUN_ACTION", { socket, namespace, action, args })
-					);
+					socket.listen(name, async ({ args, onProgress }) => {
+						WSModule.runJob("RUN_ACTION", { socket, namespace, action, args }, { onProgress });
+					});
 				});
 			});
 

+ 27 - 4
frontend/src/components/modals/BulkActions.vue

@@ -131,15 +131,38 @@ export default {
 			this.items.splice(index, 1);
 		},
 		applyChanges() {
+			let toast;
+
 			this.socket.dispatch(
 				this.type.action,
 				this.method,
 				this.items,
 				this.type.items,
-				res => {
-					new Toast(res.message);
-					if (res.status === "success")
-						this.closeModal("bulkActions");
+				{
+					cb: () => {
+						// new Toast(res.message);
+						// if (res.status === "success")
+						// 	this.closeModal("bulkActions");
+					},
+					onProgress: res => {
+						if (!toast) {
+							toast = new Toast({
+								content: res.message,
+								persistent: true,
+								interactable: false
+							});
+						} else {
+							toast.content = res.message;
+						}
+						if (
+							res.status === "success" ||
+							res.status === "error"
+						) {
+							setTimeout(() => {
+								toast.destroy();
+							}, 4000);
+						}
+					}
 				}
 			);
 		},

+ 30 - 5
frontend/src/ws.js

@@ -17,6 +17,8 @@ const onDisconnect = {
 const CB_REFS = {};
 let CB_REF = 0;
 
+const PROGRESS_CB_REFS = {};
+
 export default {
 	socket: null,
 	dispatcher: null,
@@ -45,6 +47,14 @@ export default {
 				delete CB_REFS[id];
 		});
 
+		Object.keys(PROGRESS_CB_REFS).forEach(id => {
+			if (
+				id.indexOf("$event:") !== -1 &&
+				id.indexOf("$event:keep.") === -1
+			)
+				delete PROGRESS_CB_REFS[id];
+		});
+
 		// destroy all listeners that aren't site-wide
 		Object.keys(this.socket.dispatcher.listeners).forEach(type => {
 			if (type.indexOf("keep.") === -1 && type !== "ready")
@@ -82,22 +92,33 @@ export default {
 			}
 
 			dispatch(...args) {
-				CB_REF += 1;
-
 				if (this.readyState !== 1)
 					return pendingDispatches.push(() =>
 						waitForConnectionToDispatch(...args)
 					);
 
-				const cb = args[args.length - 1];
+				const lastArg = args[args.length - 1];
 
-				if (typeof cb === "function") {
-					CB_REFS[CB_REF] = cb;
+				if (typeof lastArg === "function") {
+					CB_REF += 1;
+					CB_REFS[CB_REF] = lastArg;
 
 					return this.send(
 						JSON.stringify([...args.slice(0, -1), { CB_REF }])
 					);
 				}
+				if (typeof lastArg === "object") {
+					CB_REF += 1;
+					CB_REFS[CB_REF] = lastArg.cb;
+					PROGRESS_CB_REFS[CB_REF] = lastArg.onProgress;
+
+					return this.send(
+						JSON.stringify([
+							...args.slice(0, -1),
+							{ CB_REF, onProgress: true }
+						])
+					);
+				}
 
 				return this.send(JSON.stringify([...args]));
 			}
@@ -119,6 +140,10 @@ export default {
 				CB_REFS[CB_REF](...data);
 				return delete CB_REFS[CB_REF];
 			}
+			if (name === "PROGRESS_CB_REF") {
+				const PROGRESS_CB_REF = data.shift(0);
+				PROGRESS_CB_REFS[PROGRESS_CB_REF](...data);
+			}
 
 			if (name === "ERROR") console.log("WS: SOCKET ERROR:", data[0]);