Browse Source

feat: added UPDATE_SONGS job on the backend to more efficiently update many songs

Kristian Vos 3 năm trước cách đây
mục cha
2 tập tin đã thay đổi với 287 bổ sung11 xóa
  1. 10 11
  2. 277 0

+ 10 - 11

@@ -410,22 +410,21 @@ export default {
 	 * At this time only used in EditSongs
 	 * @param {object} session - the session object automatically added by the websocket
-	 * @param {array} songIds - the song ids
+	 * @param {Array} songIds - the song ids
 	 * @param {Function} cb
-	 getSongsFromSongIds: isAdminRequired(function getSongFromSongId(session, songIds, cb) {
+	getSongsFromSongIds: isAdminRequired(function getSongFromSongId(session, songIds, cb) {
 				next => {
-					SongsModule.runJob("GET_SONGS", { songIds, properties: [
-						"youtubeId",
-						"title",
-						"artists",
-						"thumbnail",
-						"duration",
-						"verified",
-						"_id"
-					] }, this)
+					SongsModule.runJob(
+						"GET_SONGS",
+						{
+							songIds,
+							properties: ["youtubeId", "title", "artists", "thumbnail", "duration", "verified", "_id"]
+						},
+						this
+					)
 						.then(response => next(null, response.songs))
 						.catch(err => next(err));

+ 277 - 0

@@ -467,6 +467,283 @@ class _SongsModule extends CoreClass {
+	/**
+	 * Gets multiple songs from id from Mongo and updates the cache with it
+	 *
+	 * @param {object} payload - an object containing the payload
+	 * @param {Array} payload.songIds - the ids of the songs we are trying to update
+	 * @param {string} payload.oldStatus - old status of song being updated (optional)
+	 * @returns {Promise} - returns a promise (resolve, reject)
+	 */
+	async UPDATE_SONGS(payload) {
+		const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
+		const stationModel = await DBModule.runJob("GET_MODEL", { modelName: "station" }, this);
+		return new Promise((resolve, reject) =>
+			async.waterfall(
+				[
+					// Get songs from Mongo
+					next => {
+						const { songIds } = payload;
+						SongsModule.SongModel.find({ _id: songIds }, next);
+					},
+					// Any songs that were not in Mongo, remove from cache, if they're in the cache
+					(songs, next) => {
+						const { songIds } = payload;
+						async.eachLimit(
+							songIds,
+							1,
+							(songId, next) => {
+								if (songs.findIndex(song => song._id.toString() === songId) === -1) {
+									// NOTE: could be made lower priority
+									CacheModule.runJob("HDEL", {
+										table: "songs",
+										key: songId
+									});
+									next();
+								} else next();
+							},
+							() => {
+								next(null, songs);
+							}
+						);
+					},
+					// Adds/updates all songs in the cache
+					(songs, next) => {
+						async.eachLimit(
+							songs,
+							1,
+							(song, next) => {
+								CacheModule.runJob(
+									"HSET",
+									{
+										table: "songs",
+										key: song._id,
+										value: song
+									},
+									this
+								)
+									.then(() => {
+										next();
+									})
+									.catch(next);
+							},
+							() => {
+								next(null, songs);
+							}
+						);
+					},
+					// Updates all playlists that the songs are in by setting the new trimmed song
+					(songs, next) => {
+						const trimmedSongs = => {
+							const { _id, youtubeId, title, artists, thumbnail, duration, verified } = song;
+							return {
+								_id,
+								youtubeId,
+								title,
+								artists,
+								thumbnail,
+								duration,
+								verified
+							};
+						});
+						const playlistsToUpdate = new Set();
+						async.eachLimit(
+							trimmedSongs,
+							1,
+							(trimmedSong, next) => {
+								async.waterfall(
+									[
+										next => {
+											playlistModel.updateMany(
+												{ "songs._id": trimmedSong._id },
+												{ $set: { "songs.$": trimmedSong } },
+												next
+											);
+										},
+										(res, next) => {
+											playlistModel.find({ "songs._id": trimmedSong._id }, next);
+										},
+										(playlists, next) => {
+											playlists.forEach(playlist => {
+												playlistsToUpdate.add(playlist._id.toString());
+											});
+											next();
+										}
+									],
+									next
+								);
+							},
+							err => {
+								next(err, songs, playlistsToUpdate);
+							}
+						);
+					},
+					// Updates all playlists that the songs are in
+					(songs, playlistsToUpdate, next) => {
+						async.eachLimit(
+							playlistsToUpdate,
+							1,
+							(playlistId, next) => {
+								PlaylistsModule.runJob(
+									"UPDATE_PLAYLIST",
+									{
+										playlistId
+									},
+									this
+								)
+									.then(() => {
+										next();
+									})
+									.catch(err => {
+										next(err);
+									});
+							},
+							err => {
+								next(err, songs);
+							}
+						);
+					},
+					// Updates all station queues that the songs are in by setting the new trimmed song
+					(songs, next) => {
+						const stationsToUpdate = new Set();
+						async.eachLimit(
+							songs,
+							1,
+							(song, next) => {
+								async.waterfall(
+									[
+										next => {
+											const { youtubeId, title, artists, thumbnail, duration, verified } = song;
+											stationModel.updateMany(
+												{ "queue._id": song._id },
+												{
+													$set: {
+														"queue.$.youtubeId": youtubeId,
+														"queue.$.title": title,
+														"queue.$.artists": artists,
+														"queue.$.thumbnail": thumbnail,
+														"queue.$.duration": duration,
+														"queue.$.verified": verified
+													}
+												},
+												next
+											);
+										},
+										(res, next) => {
+											stationModel.find({ "queue._id": song._id }, next);
+										},
+										(stations, next) => {
+											stations.forEach(station => {
+												stationsToUpdate.add(station._id.toString());
+											});
+											next();
+										}
+									],
+									next
+								);
+							},
+							err => {
+								next(err, songs, stationsToUpdate);
+							}
+						);
+					},
+					// Updates all playlists that the songs are in
+					(songs, stationsToUpdate, next) => {
+						async.eachLimit(
+							stationsToUpdate,
+							1,
+							(stationId, next) => {
+								StationsModule.runJob(
+									"UPDATE_STATION",
+									{
+										stationId
+									},
+									this
+								)
+									.then(() => {
+										next();
+									})
+									.catch(err => {
+										next(err);
+									});
+							},
+							err => {
+								next(err, songs);
+							}
+						);
+					},
+					// Autofill the genre playlists of all genres of all songs
+					(songs, next) => {
+						const genresToAutofill = new Set();
+						songs.forEach(song => {
+							song.genres.forEach(genre => {
+								genresToAutofill.add(genre);
+							});
+						});
+						async.eachLimit(
+							genresToAutofill,
+							1,
+							(genre, next) => {
+								PlaylistsModule.runJob("AUTOFILL_GENRE_PLAYLIST", { genre }, this)
+									.then(() => {
+										next();
+									})
+									.catch(err => next(err));
+							},
+							err => {
+								next(err, songs);
+							}
+						);
+					},
+					// Send event that the song was updated
+					(songs, next) => {
+						async.eachLimit(
+							songs,
+							1,
+							(song, next) => {
+								CacheModule.runJob("PUB", {
+									channel: "song.updated",
+									value: { songId: song._id, oldStatus: null }
+								});
+								next();
+							},
+							() => {
+								next();
+							}
+						);
+					}
+				],
+				err => {
+					if (err && err !== true) return reject(new Error(err));
+					return resolve();
+				}
+			)
+		);
+	}
 	 * Updates all songs