Ver código fonte

refactor: Integrated long jobs

Owen Diffey 1 ano atrás
pai
commit
e180739887

+ 25 - 0
backend/logic/actions/media.js

@@ -129,6 +129,23 @@ export default {
 	 * @param cb
 	 */
 	recalculateAllRatings: isAdminRequired(async function recalculateAllRatings(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Recalculate all ratings",
+			message: "Recalculating all ratings.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -145,9 +162,17 @@ export default {
 				if (err) {
 					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
 					this.log("ERROR", "MEDIA_RECALCULATE_ALL_RATINGS", `Failed to recalculate all ratings. "${err}"`);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 				this.log("SUCCESS", "MEDIA_RECALCULATE_ALL_RATINGS", `Recalculated all ratings successfully.`);
+				this.publishProgress({
+					status: "success",
+					message: "Successfully recalculated all ratings."
+				});
 				return cb({ status: "success", message: "Successfully recalculated all ratings." });
 			}
 		);

+ 188 - 8
backend/logic/actions/playlists.js

@@ -1253,16 +1253,34 @@ export default {
 	 * @param {boolean} musicOnly - whether to only add music to the playlist
 	 * @param {Function} cb - gets called with the result
 	 */
-	addSetToPlaylist: isLoginRequired(function addSetToPlaylist(session, url, playlistId, musicOnly, cb) {
+	addSetToPlaylist: isLoginRequired(async function addSetToPlaylist(session, url, playlistId, musicOnly, cb) {
 		let videosInPlaylistTotal = 0;
 		let songsInPlaylistTotal = 0;
 		let addSongsStats = null;
 
 		const addedSongs = [];
 
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Import YouTube playlist",
+			message: "Importing YouTube playlist.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
+					this.publishProgress({ status: "update", message: `Importing YouTube playlist (stage 1)` });
 					YouTubeModule.runJob("GET_PLAYLIST", { url, musicOnly }, this)
 						.then(res => {
 							if (res.filteredSongs) {
@@ -1278,6 +1296,7 @@ export default {
 						});
 				},
 				(youtubeIds, next) => {
+					this.publishProgress({ status: "update", message: `Importing YouTube playlist (stage 2)` });
 					let successful = 0;
 					let failed = 0;
 					let alreadyInPlaylist = 0;
@@ -1340,12 +1359,14 @@ export default {
 				},
 
 				next => {
+					this.publishProgress({ status: "update", message: `Importing YouTube playlist (stage 3)` });
 					PlaylistsModule.runJob("GET_PLAYLIST", { playlistId }, this)
 						.then(playlist => next(null, playlist))
 						.catch(next);
 				},
 
 				(playlist, next) => {
+					this.publishProgress({ status: "update", message: `Importing YouTube playlist (stage 4)` });
 					if (!playlist || playlist.createdBy !== session.userId) {
 						return DBModule.runJob("GET_MODEL", { modelName: "user" }, this).then(userModel => {
 							userModel.findOne({ _id: session.userId }, (err, user) => {
@@ -1366,6 +1387,10 @@ export default {
 						"PLAYLIST_IMPORT",
 						`Importing a YouTube playlist to private playlist "${playlistId}" failed for user "${session.userId}". "${err}"`
 					);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 
@@ -1384,7 +1409,10 @@ export default {
 					"PLAYLIST_IMPORT",
 					`Successfully imported a YouTube playlist to private playlist "${playlistId}" for user "${session.userId}". Videos in playlist: ${videosInPlaylistTotal}, songs in playlist: ${songsInPlaylistTotal}, songs successfully added: ${addSongsStats.successful}, songs failed: ${addSongsStats.failed}, already in playlist: ${addSongsStats.alreadyInPlaylist}, already in liked ${addSongsStats.alreadyInLikedPlaylist}, already in disliked ${addSongsStats.alreadyInDislikedPlaylist}.`
 				);
-
+				this.publishProgress({
+					status: "success",
+					message: `Playlist has been imported. ${addSongsStats.successful} were added successfully, ${addSongsStats.failed} failed (${addSongsStats.alreadyInPlaylist} were already in the playlist)`
+				});
 				return cb({
 					status: "success",
 					message: `Playlist has been imported. ${addSongsStats.successful} were added successfully, ${addSongsStats.failed} failed (${addSongsStats.alreadyInPlaylist} were already in the playlist)`,
@@ -1963,6 +1991,23 @@ export default {
 	 * @param {Function} cb - gets called with the result
 	 */
 	deleteOrphanedStationPlaylists: isAdminRequired(async function index(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Delete orphaned station playlists",
+			message: "Deleting orphaned station playlists.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -1979,6 +2024,10 @@ export default {
 						"PLAYLISTS_DELETE_ORPHANED_STATION_PLAYLISTS",
 						`Deleting orphaned station playlists failed. "${err}"`
 					);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 				this.log(
@@ -1986,6 +2035,10 @@ export default {
 					"PLAYLISTS_DELETE_ORPHANED_STATION_PLAYLISTS",
 					"Deleting orphaned station playlists successful."
 				);
+				this.publishProgress({
+					status: "success",
+					message: "Successfully deleted orphaned station playlists."
+				});
 				return cb({ status: "success", message: "Successfully deleted orphaned station playlists." });
 			}
 		);
@@ -1998,6 +2051,23 @@ export default {
 	 * @param {Function} cb - gets called with the result
 	 */
 	deleteOrphanedGenrePlaylists: isAdminRequired(async function index(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Delete orphaned genre playlists",
+			message: "Deleting orphaned genre playlists.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -2014,6 +2084,10 @@ export default {
 						"PLAYLISTS_DELETE_ORPHANED_GENRE_PLAYLISTS",
 						`Deleting orphaned genre playlists failed. "${err}"`
 					);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 				this.log(
@@ -2021,6 +2095,10 @@ export default {
 					"PLAYLISTS_DELETE_ORPHANED_GENRE_PLAYLISTS",
 					"Deleting orphaned genre playlists successful."
 				);
+				this.publishProgress({
+					status: "success",
+					message: "Successfully deleted orphaned genre playlists."
+				});
 				return cb({ status: "success", message: "Successfully deleted orphaned genre playlists." });
 			}
 		);
@@ -2033,6 +2111,23 @@ export default {
 	 * @param {Function} cb - gets called with the result
 	 */
 	requestOrphanedPlaylistSongs: isAdminRequired(async function index(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Request orphaned playlist songs",
+			message: "Requesting orphaned playlist songs.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -2049,6 +2144,10 @@ export default {
 						"REQUEST_ORPHANED_PLAYLIST_SONGS",
 						`Requesting orphaned playlist songs failed. "${err}"`
 					);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 				this.log(
@@ -2056,6 +2155,10 @@ export default {
 					"REQUEST_ORPHANED_PLAYLIST_SONGS",
 					"Requesting orphaned playlist songs was successful."
 				);
+				this.publishProgress({
+					status: "success",
+					message: "Successfully requested orphaned playlist songs."
+				});
 				return cb({ status: "success", message: "Successfully requested orphaned playlist songs." });
 			}
 		);
@@ -2168,6 +2271,23 @@ export default {
 	 * @param {Function} cb - gets called with the result
 	 */
 	clearAndRefillAllStationPlaylists: isAdminRequired(async function index(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Clear and refill all station playlists",
+			message: "Clearing and refilling all station playlists.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -2185,6 +2305,10 @@ export default {
 						playlists,
 						1,
 						(playlist, next) => {
+							this.publishProgress({
+								status: "update",
+								message: `Clearing and refilling "${playlist._id}"`
+							});
 							PlaylistsModule.runJob(
 								"CLEAR_AND_REFILL_STATION_PLAYLIST",
 								{ playlistId: playlist._id },
@@ -2210,7 +2334,10 @@ export default {
 						"PLAYLIST_CLEAR_AND_REFILL_ALL_STATION_PLAYLISTS",
 						`Clearing and refilling all station playlists failed for user "${session.userId}". "${err}"`
 					);
-
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 
@@ -2219,7 +2346,10 @@ export default {
 					"PLAYLIST_CLEAR_AND_REFILL_ALL_STATION_PLAYLISTS",
 					`Successfully cleared and refilled all station playlists for user "${session.userId}".`
 				);
-
+				this.publishProgress({
+					status: "success",
+					message: "Playlists have been successfully cleared and refilled."
+				});
 				return cb({
 					status: "success",
 					message: "Playlists have been successfully cleared and refilled"
@@ -2235,6 +2365,23 @@ export default {
 	 * @param {Function} cb - gets called with the result
 	 */
 	clearAndRefillAllGenrePlaylists: isAdminRequired(async function index(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Clear and refill all genre playlists",
+			message: "Clearing and refilling all genre playlists.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -2252,6 +2399,10 @@ export default {
 						playlists,
 						1,
 						(playlist, next) => {
+							this.publishProgress({
+								status: "update",
+								message: `Clearing and refilling "${playlist._id}"`
+							});
 							PlaylistsModule.runJob(
 								"CLEAR_AND_REFILL_GENRE_PLAYLIST",
 								{ playlistId: playlist._id },
@@ -2277,7 +2428,10 @@ export default {
 						"PLAYLIST_CLEAR_AND_REFILL_ALL_GENRE_PLAYLISTS",
 						`Clearing and refilling all genre playlists failed for user "${session.userId}". "${err}"`
 					);
-
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 
@@ -2286,7 +2440,10 @@ export default {
 					"PLAYLIST_CLEAR_AND_REFILL_ALL_GENRE_PLAYLISTS",
 					`Successfully cleared and refilled all genre playlists for user "${session.userId}".`
 				);
-
+				this.publishProgress({
+					status: "success",
+					message: "Playlists have been successfully cleared and refilled."
+				});
 				return cb({
 					status: "success",
 					message: "Playlists have been successfully cleared and refilled"
@@ -2302,6 +2459,23 @@ export default {
 	 * @param {Function} cb - gets called with the result
 	 */
 	createMissingGenrePlaylists: isAdminRequired(async function index(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Create missing genre playlists",
+			message: "Creating missing genre playlists.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -2323,7 +2497,10 @@ export default {
 						"PLAYLIST_CREATE_MISSING_GENRE_PLAYLISTS",
 						`Creating missing genre playlists failed for user "${session.userId}". "${err}"`
 					);
-
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 
@@ -2332,7 +2509,10 @@ export default {
 					"PLAYLIST_CREATE_MISSING_GENRE_PLAYLISTS",
 					`Successfully created missing genre playlists for user "${session.userId}".`
 				);
-
+				this.publishProgress({
+					status: "success",
+					message: "Missing genre playlists have been successfully created."
+				});
 				return cb({
 					status: "success",
 					message: "Missing genre playlists have been successfully created"

+ 165 - 4
backend/logic/actions/songs.js

@@ -210,6 +210,23 @@ export default {
 	 * @param cb
 	 */
 	updateAll: isAdminRequired(async function updateAll(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Update all songs",
+			message: "Updating all songs.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -226,9 +243,17 @@ export default {
 				if (err) {
 					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
 					this.log("ERROR", "SONGS_UPDATE_ALL", `Failed to update all songs. "${err}"`);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 				this.log("SUCCESS", "SONGS_UPDATE_ALL", `Updated all songs successfully.`);
+				this.publishProgress({
+					status: "success",
+					message: "Successfully updated all songs."
+				});
 				return cb({ status: "success", message: "Successfully updated all songs." });
 			}
 		);
@@ -696,6 +721,23 @@ export default {
 		const successful = [];
 		const failed = [];
 
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Bulk remove songs",
+			message: "Removing songs.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -703,6 +745,7 @@ export default {
 						songIds,
 						1,
 						(songId, next) => {
+							this.publishProgress({ status: "update", message: `Removing song "${songId}"` });
 							WSModule.runJob(
 								"RUN_ACTION2",
 								{
@@ -735,6 +778,11 @@ export default {
 
 					this.log("ERROR", "SONGS_REMOVE_MANY", `Failed to remove songs "${failed.join(", ")}". "${err}"`);
 
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
+
 					return cb({ status: "error", message: err });
 				}
 
@@ -749,6 +797,11 @@ export default {
 
 				this.log("SUCCESS", "SONGS_REMOVE_MANY", `${message} "${successful.join(", ")}"`);
 
+				this.publishProgress({
+					status: "success",
+					message
+				});
+
 				return cb({
 					status: "success",
 					message
@@ -870,6 +923,23 @@ export default {
 		const successful = [];
 		const failed = [];
 
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Bulk verifying songs",
+			message: "Verifying songs.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -877,6 +947,7 @@ export default {
 						songIds,
 						1,
 						(songId, next) => {
+							this.publishProgress({ status: "update", message: `Verifying song "${songId}"` });
 							WSModule.runJob(
 								"RUN_ACTION2",
 								{
@@ -908,7 +979,10 @@ export default {
 					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
 
 					this.log("ERROR", "SONGS_VERIFY_MANY", `Failed to verify songs "${failed.join(", ")}". "${err}"`);
-
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 
@@ -922,7 +996,10 @@ export default {
 				}
 
 				this.log("SUCCESS", "SONGS_VERIFY_MANY", `${message} "${successful.join(", ")}"`);
-
+				this.publishProgress({
+					status: "success",
+					message
+				});
 				return cb({
 					status: "success",
 					message
@@ -1007,6 +1084,23 @@ export default {
 		const successful = [];
 		const failed = [];
 
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Bulk unverifying songs",
+			message: "Unverifying songs.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -1014,6 +1108,7 @@ export default {
 						songIds,
 						1,
 						(songId, next) => {
+							this.publishProgress({ status: "update", message: `Unverifying song "${songId}"` });
 							WSModule.runJob(
 								"RUN_ACTION2",
 								{
@@ -1049,7 +1144,10 @@ export default {
 						"SONGS_UNVERIFY_MANY",
 						`Failed to unverify songs "${failed.join(", ")}". "${err}"`
 					);
-
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 
@@ -1067,7 +1165,10 @@ export default {
 				}
 
 				this.log("SUCCESS", "SONGS_UNVERIFY_MANY", `${message} "${successful.join(", ")}"`);
-
+				this.publishProgress({
+					status: "success",
+					message
+				});
 				return cb({
 					status: "success",
 					message
@@ -1123,6 +1224,24 @@ export default {
 	 */
 	editGenres: isAdminRequired(async function editGenres(session, method, genres, songIds, cb) {
 		const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
+
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Bulk editing genres",
+			message: "Updating genres.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -1148,6 +1267,10 @@ export default {
 						return;
 					}
 
+					this.publishProgress({
+						status: "update",
+						message: "Updating genres in MongoDB."
+					});
 					songModel.updateMany({ _id: { $in: songsFound } }, query, { runValidators: true }, err => {
 						if (err) {
 							next(err);
@@ -1162,9 +1285,17 @@ export default {
 				if (err && err !== true) {
 					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
 					this.log("ERROR", "EDIT_GENRES", `User ${session.userId} failed to edit genres. '${err}'`);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					cb({ status: "error", message: err });
 				} else {
 					this.log("SUCCESS", "EDIT_GENRES", `User ${session.userId} has successfully edited genres.`);
+					this.publishProgress({
+						status: "success",
+						message: "Successfully edited genres."
+					});
 					cb({
 						status: "success",
 						message: "Successfully edited genres."
@@ -1221,6 +1352,24 @@ export default {
 	 */
 	editArtists: isAdminRequired(async function editArtists(session, method, artists, songIds, cb) {
 		const songModel = await DBModule.runJob("GET_MODEL", { modelName: "song" }, this);
+
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Bulk editing artists",
+			message: "Updating artists.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -1246,6 +1395,10 @@ export default {
 						return;
 					}
 
+					this.publishProgress({
+						status: "update",
+						message: "Updating artists in MongoDB."
+					});
 					songModel.updateMany({ _id: { $in: songsFound } }, query, { runValidators: true }, err => {
 						if (err) {
 							next(err);
@@ -1260,9 +1413,17 @@ export default {
 				if (err && err !== true) {
 					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
 					this.log("ERROR", "EDIT_ARTISTS", `User ${session.userId} failed to edit artists. '${err}'`);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					cb({ status: "error", message: err });
 				} else {
 					this.log("SUCCESS", "EDIT_ARTISTS", `User ${session.userId} has successfully edited artists.`);
+					this.publishProgress({
+						status: "success",
+						message: "Successfully edited artists."
+					});
 					cb({
 						status: "success",
 						message: "Successfully edited artists."

+ 25 - 0
backend/logic/actions/stations.js

@@ -2469,6 +2469,23 @@ export default {
 	 * @param {Function} cb - gets called with the result
 	 */
 	clearEveryStationQueue: isAdminRequired(async function clearEveryStationQueue(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Clear every station queue",
+			message: "Clearing every station queue.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		async.waterfall(
 			[
 				next => {
@@ -2481,9 +2498,17 @@ export default {
 				if (err) {
 					err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
 					this.log("ERROR", "CLEAR_EVERY_STATION_QUEUE", `Clearing every station queue failed. "${err}"`);
+					this.publishProgress({
+						status: "error",
+						message: err
+					});
 					return cb({ status: "error", message: err });
 				}
 				this.log("SUCCESS", "CLEAR_EVERY_STATION_QUEUE", "Clearing every station queue was successful.");
+				this.publishProgress({
+					status: "success",
+					message: "Successfully cleared every station queue."
+				});
 				return cb({ status: "success", message: "Successfully cleared every station queue." });
 			}
 		);

+ 51 - 2
backend/logic/actions/youtube.js

@@ -165,6 +165,23 @@ export default {
 	 * @returns {{status: string, data: object}}
 	 */
 	resetStoredApiRequests: isAdminRequired(async function resetStoredApiRequests(session, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Reset stored API requests",
+			message: "Resetting stored API requests.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		YouTubeModule.runJob("RESET_STORED_API_REQUESTS", {}, this)
 			.then(() => {
 				this.log(
@@ -172,6 +189,10 @@ export default {
 					"YOUTUBE_RESET_STORED_API_REQUESTS",
 					`Resetting stored API requests was successful.`
 				);
+				this.publishProgress({
+					status: "success",
+					message: "Successfully reset stored YouTube API requests."
+				});
 				return cb({ status: "success", message: "Successfully reset stored YouTube API requests" });
 			})
 			.catch(async err => {
@@ -181,6 +202,10 @@ export default {
 					"YOUTUBE_RESET_STORED_API_REQUESTS",
 					`Resetting stored API requests failed. "${err}"`
 				);
+				this.publishProgress({
+					status: "error",
+					message: err
+				});
 				return cb({ status: "error", message: err });
 			});
 	}),
@@ -340,16 +365,40 @@ export default {
 	 *
 	 * @returns {{status: string, data: object}}
 	 */
-	removeVideos: isAdminRequired(function removeVideos(session, videoIds, cb) {
+	removeVideos: isAdminRequired(async function removeVideos(session, videoIds, cb) {
+		this.keepLongJob();
+		this.publishProgress({
+			status: "started",
+			title: "Bulk remove YouTube videos",
+			message: "Bulk removing YouTube videos.",
+			id: this.toString()
+		});
+		await CacheModule.runJob("RPUSH", { key: `longJobs.${session.userId}`, value: this.toString() }, this);
+		await CacheModule.runJob(
+			"PUB",
+			{
+				channel: "longJob.added",
+				value: { jobId: this.toString(), userId: session.userId }
+			},
+			this
+		);
+
 		YouTubeModule.runJob("REMOVE_VIDEOS", { videoIds }, this)
 			.then(() => {
 				this.log("SUCCESS", "YOUTUBE_REMOVE_VIDEOS", `Removing videos was successful.`);
-
+				this.publishProgress({
+					status: "success",
+					message: "Successfully removed YouTube videos."
+				});
 				return cb({ status: "success", message: "Successfully removed YouTube videos" });
 			})
 			.catch(async err => {
 				err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
 				this.log("ERROR", "YOUTUBE_REMOVE_VIDEOS", `Removing videos failed. "${err}"`);
+				this.publishProgress({
+					status: "error",
+					message: err
+				});
 				return cb({ status: "error", message: err });
 			});
 	}),

+ 4 - 0
backend/logic/media.js

@@ -226,6 +226,10 @@ class _MediaModule extends CoreClass {
 							youtubeIds,
 							2,
 							(youtubeId, next) => {
+								this.publishProgress({
+									status: "update",
+									message: `Recalculating ratings for ${youtubeId}`
+								});
 								MediaModule.runJob("RECALCULATE_RATINGS", { youtubeId }, this)
 									.then(() => {
 										next();

+ 2 - 0
backend/logic/playlists.js

@@ -754,6 +754,7 @@ class _PlaylistsModule extends CoreClass {
 						response.playlists,
 						1,
 						(playlist, next) => {
+							this.publishProgress({ status: "update", message: `Deleting "${playlist._id}"` });
 							PlaylistsModule.runJob("DELETE_PLAYLIST", { playlistId: playlist._id }, this)
 								.then(() => {
 									this.log("INFO", "Deleting orphaned genre playlist");
@@ -827,6 +828,7 @@ class _PlaylistsModule extends CoreClass {
 						response.playlists,
 						1,
 						(playlist, next) => {
+							this.publishProgress({ status: "update", message: `Deleting "${playlist._id}"` });
 							PlaylistsModule.runJob("DELETE_PLAYLIST", { playlistId: playlist._id }, this)
 								.then(() => {
 									this.log("INFO", "Deleting orphaned station playlist");

+ 5 - 0
backend/logic/songs.js

@@ -790,6 +790,7 @@ class _SongsModule extends CoreClass {
 							(song, next) => {
 								index += 1;
 								console.log(`Updating song #${index} out of ${length}: ${song._id}`);
+								this.publishProgress({ status: "update", message: `Updating song "${song._id}"` });
 								SongsModule.runJob("UPDATE_SONG", { songId: song._id }, this)
 									.then(() => {
 										next();
@@ -1118,6 +1119,10 @@ class _SongsModule extends CoreClass {
 								async.waterfall(
 									[
 										next => {
+											this.publishProgress({
+												status: "update",
+												message: `Requesting "${youtubeId}"`
+											});
 											console.log(
 												youtubeId,
 												`this is song ${youtubeIds.indexOf(youtubeId) + 1}/${youtubeIds.length}`

+ 4 - 0
backend/logic/stations.js

@@ -1644,6 +1644,10 @@ class _StationsModule extends CoreClass {
 											stations,
 											1,
 											(station, next) => {
+												this.publishProgress({
+													status: "update",
+													message: `Updating station "${station._id}"`
+												});
 												StationsModule.runJob("UPDATE_STATION", {
 													stationId: station._id
 												})

+ 11 - 0
backend/logic/youtube.js

@@ -1189,10 +1189,12 @@ class _YouTubeModule extends CoreClass {
 			async.waterfall(
 				[
 					next => {
+						this.publishProgress({ status: "update", message: `Resetting stored API requests (stage 1)` });
 						YouTubeModule.youtubeApiRequestModel.find({}, next);
 					},
 
 					(apiRequests, next) => {
+						this.publishProgress({ status: "update", message: `Resetting stored API requests (stage 2)` });
 						YouTubeModule.youtubeApiRequestModel.deleteMany({}, err => {
 							if (err) next("Couldn't reset stored YouTube API requests.");
 							else {
@@ -1202,18 +1204,21 @@ class _YouTubeModule extends CoreClass {
 					},
 
 					(apiRequests, next) => {
+						this.publishProgress({ status: "update", message: `Resetting stored API requests (stage 3)` });
 						CacheModule.runJob("DEL", { key: "youtubeApiRequestParams" }, this)
 							.then(() => next(null, apiRequests))
 							.catch(err => next(err));
 					},
 
 					(apiRequests, next) => {
+						this.publishProgress({ status: "update", message: `Resetting stored API requests (stage 4)` });
 						CacheModule.runJob("DEL", { key: "youtubeApiRequestResults" }, this)
 							.then(() => next(null, apiRequests))
 							.catch(err => next(err));
 					},
 
 					(apiRequests, next) => {
+						this.publishProgress({ status: "update", message: `Resetting stored API requests (stage 5)` });
 						async.eachLimit(
 							apiRequests.map(apiRequest => apiRequest._id),
 							1,
@@ -1466,6 +1471,7 @@ class _YouTubeModule extends CoreClass {
 						if (!videoIds.every(videoId => mongoose.Types.ObjectId.isValid(videoId)))
 							next("One or more videoIds are not a valid ObjectId.");
 						else {
+							this.publishProgress({ status: "update", message: `Removing video (stage 1)` });
 							YouTubeModule.youtubeVideoModel.find({ _id: { $in: videoIds } }, (err, videos) => {
 								if (err) next(err);
 								else
@@ -1478,6 +1484,7 @@ class _YouTubeModule extends CoreClass {
 					},
 
 					(youtubeIds, next) => {
+						this.publishProgress({ status: "update", message: `Removing video (stage 2)` });
 						SongsModule.SongModel.find({ youtubeId: { $in: youtubeIds } }, (err, songs) => {
 							if (err) next(err);
 							else {
@@ -1492,12 +1499,14 @@ class _YouTubeModule extends CoreClass {
 					},
 
 					(youtubeIds, next) => {
+						this.publishProgress({ status: "update", message: `Removing video (stage 3)` });
 						MediaModule.runJob("REMOVE_RATINGS", { youtubeIds }, this)
 							.then(() => next(null, youtubeIds))
 							.catch(next);
 					},
 
 					(youtubeIds, next) => {
+						this.publishProgress({ status: "update", message: `Removing video (stage 4)` });
 						async.eachLimit(
 							youtubeIds,
 							2,
@@ -1601,10 +1610,12 @@ class _YouTubeModule extends CoreClass {
 					},
 
 					next => {
+						this.publishProgress({ status: "update", message: `Removing video (stage 5)` });
 						YouTubeModule.youtubeVideoModel.deleteMany({ _id: { $in: videoIds } }, next);
 					},
 
 					(res, next) => {
+						this.publishProgress({ status: "update", message: `Removing video (stage 6)` });
 						CacheModule.runJob("PUB", {
 							channel: "youtube.removeVideos",
 							value: videoIds

+ 18 - 10
frontend/src/components/RunJobDropdown.vue

@@ -55,8 +55,6 @@
 <script>
 import { mapGetters } from "vuex";
 
-import Toast from "toasters";
-
 export default {
 	props: {
 		jobs: {
@@ -76,14 +74,24 @@ export default {
 	},
 	methods: {
 		runJob(job) {
-			new Toast(`Running job: ${job.name}`);
-			this.socket.dispatch(job.socket, data => {
-				if (data.status !== "success")
-					new Toast({
-						content: `Error: ${data.message}`,
-						timeout: 8000
-					});
-				else new Toast({ content: data.message, timeout: 4000 });
+			let id;
+			let title;
+
+			this.socket.dispatch(job.socket, {
+				cb: () => {},
+				onProgress: res => {
+					if (res.status === "started") {
+						id = res.id;
+						title = res.title;
+					}
+
+					if (id)
+						this.setJob({
+							id,
+							name: title,
+							...res
+						});
+				}
 			});
 		}
 	}

+ 1 - 5
frontend/src/components/modals/BulkActions.vue

@@ -140,11 +140,7 @@ export default {
 				this.items,
 				this.type.items,
 				{
-					cb: () => {
-						// new Toast(res.message);
-						// if (res.status === "success")
-						// 	this.closeCurrentModal();
-					},
+					cb: () => {},
 					onProgress: res => {
 						if (res.status === "started") {
 							id = res.id;

+ 14 - 41
frontend/src/components/modals/EditPlaylist/Tabs/ImportPlaylists.vue

@@ -56,7 +56,8 @@ export default {
 	},
 	methods: {
 		importPlaylist() {
-			let isImportingPlaylist = true;
+			let id;
+			let title;
 
 			// import query is blank
 			if (!this.youtubeSearch.playlist.query)
@@ -72,53 +73,25 @@ export default {
 				});
 			}
 
-			// don't give starting import message instantly in case of instant error
-			setTimeout(() => {
-				if (isImportingPlaylist) {
-					new Toast(
-						"Starting to import your playlist. This can take some time to do."
-					);
-				}
-			}, 750);
-
 			return this.socket.dispatch(
 				"playlists.addSetToPlaylist",
 				this.youtubeSearch.playlist.query,
 				this.playlist._id,
 				this.youtubeSearch.playlist.isImportingOnlyMusic,
-				res => {
-					new Toast({ content: res.message, timeout: 20000 });
-					if (res.status === "success") {
-						isImportingPlaylist = false;
-
-						const {
-							songsInPlaylistTotal,
-							videosInPlaylistTotal,
-							alreadyInLikedPlaylist,
-							alreadyInDislikedPlaylist
-						} = res.data.stats;
-
-						if (this.youtubeSearch.playlist.isImportingOnlyMusic) {
-							new Toast({
-								content: `${songsInPlaylistTotal} of the ${videosInPlaylistTotal} videos in the playlist were songs.`,
-								timeout: 20000
-							});
+				{
+					cb: () => {},
+					onProgress: res => {
+						if (res.status === "started") {
+							id = res.id;
+							title = res.title;
 						}
-						if (
-							alreadyInLikedPlaylist > 0 ||
-							alreadyInDislikedPlaylist > 0
-						) {
-							let message = "";
-							if (alreadyInLikedPlaylist > 0) {
-								message = `${alreadyInLikedPlaylist} songs were already in your Liked Songs playlist. A song cannot be in both the Liked Songs playlist and the Disliked Songs playlist at the same time.`;
-							} else {
-								message = `${alreadyInDislikedPlaylist} songs were already in your Disliked Songs playlist. A song cannot be in both the Liked Songs playlist and the Disliked Songs playlist at the same time.`;
-							}
-							new Toast({
-								content: message,
-								timeout: 20000
+
+						if (id)
+							this.setJob({
+								id,
+								name: title,
+								...res
 							});
-						}
 					}
 				}
 			);

+ 54 - 6
frontend/src/pages/Admin/Songs/index.vue

@@ -585,11 +585,27 @@ export default {
 			});
 		},
 		verifyMany(selectedRows) {
+			let id;
+			let title;
+
 			this.socket.dispatch(
 				"songs.verifyMany",
 				selectedRows.map(row => row._id),
-				res => {
-					new Toast(res.message);
+				{
+					cb: () => {},
+					onProgress: res => {
+						if (res.status === "started") {
+							id = res.id;
+							title = res.title;
+						}
+
+						if (id)
+							this.setJob({
+								id,
+								name: title,
+								...res
+							});
+					}
 				}
 			);
 		},
@@ -599,11 +615,27 @@ export default {
 			});
 		},
 		unverifyMany(selectedRows) {
+			let id;
+			let title;
+
 			this.socket.dispatch(
 				"songs.unverifyMany",
 				selectedRows.map(row => row._id),
-				res => {
-					new Toast(res.message);
+				{
+					cb: () => {},
+					onProgress: res => {
+						if (res.status === "started") {
+							id = res.id;
+							title = res.title;
+						}
+
+						if (id)
+							this.setJob({
+								id,
+								name: title,
+								...res
+							});
+					}
 				}
 			);
 		},
@@ -673,11 +705,27 @@ export default {
 			});
 		},
 		deleteMany(selectedRows) {
+			let id;
+			let title;
+
 			this.socket.dispatch(
 				"songs.removeMany",
 				selectedRows.map(row => row._id),
-				res => {
-					new Toast(res.message);
+				{
+					cb: () => {},
+					onProgress: res => {
+						if (res.status === "started") {
+							id = res.id;
+							title = res.title;
+						}
+
+						if (id)
+							this.setJob({
+								id,
+								name: title,
+								...res
+							});
+					}
 				}
 			);
 		},

+ 19 - 5
frontend/src/pages/Admin/YouTube/Videos.vue

@@ -349,11 +349,25 @@ export default {
 			);
 		},
 		removeVideos(videoIds) {
-			this.socket.dispatch(
-				"youtube.removeVideos",
-				videoIds,
-				res => new Toast(res.message)
-			);
+			let id;
+			let title;
+
+			this.socket.dispatch("youtube.removeVideos", videoIds, {
+				cb: () => {},
+				onProgress: res => {
+					if (res.status === "started") {
+						id = res.id;
+						title = res.title;
+					}
+
+					if (id)
+						this.setJob({
+							id,
+							name: title,
+							...res
+						});
+				}
+			});
 		},
 		getDateFormatted(createdAt) {
 			const date = new Date(createdAt);