Browse Source

Fixed and improved station users collector, turned it into a task

Kristian Vos 4 years ago
parent
commit
02882a2fef
3 changed files with 139 additions and 130 deletions
  1. 8 127
      backend/logic/actions/stations.js
  2. 4 0
      backend/logic/stations.js
  3. 127 3
      backend/logic/tasks.js

+ 8 - 127
backend/logic/actions/stations.js

@@ -14,129 +14,10 @@ const StationsModule = moduleManager.modules.stations;
 const ActivitiesModule = moduleManager.modules.activities;
 const YouTubeModule = moduleManager.modules.youtube;
 
-const userList = {};
-let usersPerStation = {};
-let usersPerStationCount = {};
-
-// Temporarily disabled until the messages in console can be limited
-setInterval(async () => {
-	const stationsCountUpdated = [];
-	const stationsUpdated = [];
-
-	// const oldUsersPerStation = usersPerStation;
-	usersPerStation = {};
-
-	const oldUsersPerStationCount = usersPerStationCount;
-	usersPerStationCount = {};
-
-	const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" }, { isQuiet: true });
-
-	async.each(
-		Object.keys(userList),
-		(socketId, next) => {
-			IOModule.runJob("SOCKET_FROM_SESSION", { socketId }, { isQuiet: true }).then(socket => {
-				const stationId = userList[socketId];
-
-				if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
-					if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
-					if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
-					delete userList[socketId];
-					return next();
-				}
-
-				if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
-				usersPerStationCount[stationId] += 1;
-				if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
-
-				return async.waterfall(
-					[
-						next => {
-							if (!socket.session || !socket.session.sessionId) return next("No session found.");
-							return CacheModule.runJob(
-								"HGET",
-								{
-									table: "sessions",
-									key: socket.session.sessionId
-								},
-								{ isQuiet: true }
-							)
-								.then(session => {
-									next(null, session);
-								})
-								.catch(next);
-						},
-
-						(session, next) => {
-							if (!session) return next("Session not found.");
-							return userModel.findOne({ _id: session.userId }, next);
-						},
-
-						(user, next) => {
-							if (!user) return next("User not found.");
-							if (usersPerStation[stationId].indexOf(user.username) !== -1)
-								return next("User already in the list.");
-							return next(null, { username: user.username, avatar: user.avatar });
-						}
-					],
-					(err, user) => {
-						if (!err) {
-							usersPerStation[stationId].push(user);
-						}
-						next();
-					}
-				);
-			});
-			// TODO Code to show users
-		},
-		() => {
-			Object.keys(usersPerStationCount).forEach(stationId => {
-				if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
-					if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
-				}
-			});
-
-			// Object.keys(usersPerStation).forEach(stationId => {
-			// 	if (
-			// 		_.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 ||
-			// 		_.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0
-			// 	) {
-			// 		if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
-			// 	}
-			// });
-
-			stationsCountUpdated.forEach(stationId => {
-				console.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
-				CacheModule.runJob(
-					"PUB",
-					{
-						table: "station.updateUserCount",
-						value: stationId
-					},
-					{ isQuiet: true }
-				);
-			});
-
-			stationsUpdated.forEach(stationId => {
-				console.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
-				CacheModule.runJob(
-					"PUB",
-					{
-						table: "station.updateUsers",
-						value: stationId
-					},
-					{ isQuiet: true }
-				);
-			});
-
-			// console.log("Userlist", usersPerStation);
-		}
-	);
-}, 3000);
-
 CacheModule.runJob("SUB", {
 	channel: "station.updateUsers",
 	cb: stationId => {
-		const list = usersPerStation[stationId] || [];
+		const list = StationsModule.usersPerStation[stationId] || [];
 		IOModule.runJob("EMIT_TO_ROOM", {
 			room: `station.${stationId}`,
 			args: ["event:users.updated", list]
@@ -147,7 +28,7 @@ CacheModule.runJob("SUB", {
 CacheModule.runJob("SUB", {
 	channel: "station.updateUserCount",
 	cb: stationId => {
-		const count = usersPerStationCount[stationId] || 0;
+		const count = StationsModule.usersPerStationCount[stationId] || 0;
 		IOModule.runJob("EMIT_TO_ROOM", {
 			room: `station.${stationId}`,
 			args: ["event:userCount.updated", count]
@@ -286,7 +167,7 @@ CacheModule.runJob("SUB", {
 
 		StationsModule.runJob("INITIALIZE_STATION", { stationId }).then(async response => {
 			const { station } = response;
-			station.userCount = usersPerStationCount[stationId] || 0;
+			station.userCount = StationsModule.usersPerStationCount[stationId] || 0;
 			IOModule.runJob("EMIT_TO_ROOM", {
 				room: "admin.stations",
 				args: ["event:admin.station.added", station]
@@ -361,7 +242,7 @@ export default {
 								],
 								(err, exists) => {
 									if (err) console.log(err);
-									station.userCount = usersPerStationCount[station._id] || 0;
+									station.userCount = StationsModule.usersPerStationCount[station._id] || 0;
 									if (exists) filteredStations.push(station);
 									next();
 								}
@@ -604,14 +485,14 @@ export default {
 						genres: station.genres,
 						blacklistedGenres: station.blacklistedGenres
 					};
-					userList[session.socketId] = station._id;
+					StationsModule.userList[session.socketId] = station._id;
 					next(null, data);
 				},
 
 				(data, next) => {
 					data = JSON.parse(JSON.stringify(data));
-					data.userCount = usersPerStationCount[data._id] || 0;
-					data.users = usersPerStation[data._id] || [];
+					data.userCount = StationsModule.usersPerStationCount[data._id] || 0;
+					data.users = StationsModule.usersPerStation[data._id] || [];
 					if (!data.currentSong || !data.currentSong.title) return next(null, data);
 					IOModule.runJob("SOCKET_JOIN_SONG_ROOM", {
 						socketId: session.socketId,
@@ -890,7 +771,7 @@ export default {
 				}
 				console.log("SUCCESS", "STATIONS_LEAVE", `Left station "${stationId}" successfully.`);
 				IOModule.runJob("SOCKET_LEAVE_ROOMS", { socketId: session });
-				delete userList[session.socketId];
+				delete StationsModule.userList[session.socketId];
 				return cb({
 					status: "success",
 					message: "Successfully left station.",

+ 4 - 0
backend/logic/stations.js

@@ -40,6 +40,10 @@ class _StationsModule extends CoreClass {
 			dislikes: -1
 		};
 
+		this.userList = {};
+		this.usersPerStation = {};
+		this.usersPerStationCount = {};
+
 		// TEMP
 		CacheModule.runJob("SUB", {
 			channel: "station.pause",

+ 127 - 3
backend/logic/tasks.js

@@ -13,6 +13,7 @@ let CacheModule;
 let StationsModule;
 let UtilsModule;
 let IOModule;
+let DBModule;
 
 class _TasksModule extends CoreClass {
 	// eslint-disable-next-line require-jsdoc
@@ -37,6 +38,7 @@ class _TasksModule extends CoreClass {
 			StationsModule = this.moduleManager.modules.stations;
 			UtilsModule = this.moduleManager.modules.utils;
 			IOModule = this.moduleManager.modules.io;
+			DBModule = this.moduleManager.modules.db;
 
 			// this.createTask("testTask", testTask, 5000, true);
 
@@ -52,10 +54,16 @@ class _TasksModule extends CoreClass {
 				timeout: 1000 * 60 * 60 * 6
 			});
 
+			// TasksModule.runJob("CREATE_TASK", {
+			// 	name: "logFileSizeCheckTask",
+			// 	fn: TasksModule.logFileSizeCheckTask,
+			// 	timeout: 1000 * 60 * 60
+			// });
+
 			TasksModule.runJob("CREATE_TASK", {
-				name: "logFileSizeCheckTask",
-				fn: TasksModule.logFileSizeCheckTask,
-				timeout: 1000 * 60 * 60
+				name: "collectStationUsersTask",
+				fn: TasksModule.collectStationUsersTask,
+				timeout: 1000 * 3
 			});
 
 			resolve();
@@ -328,6 +336,122 @@ class _TasksModule extends CoreClass {
 			);
 		});
 	}
+
+	/**
+	 * Periodically collect users in stations
+	 *
+	 * @returns {Promise} - returns promise (reject, resolve)
+	 */
+	collectStationUsersTask() {
+		return new Promise(async resolve => {
+			TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
+
+			const stationsCountUpdated = [];
+			const stationsUpdated = [];
+
+			const oldUsersPerStation = JSON.parse(JSON.stringify(StationsModule.usersPerStation));
+			const usersPerStation = {};
+
+			const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
+			const usersPerStationCount = {};
+
+			const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
+
+			async.each(
+				Object.keys(StationsModule.userList),
+				(socketId, next) => {
+					IOModule.runJob("SOCKET_FROM_SESSION", { socketId }).then(socket => {
+						const stationId = StationsModule.userList[socketId];
+
+						if (!socket || Object.keys(socket.rooms).indexOf(`station.${stationId}`) === -1) {
+							if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
+							if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
+							delete StationsModule.userList[socketId];
+							return next();
+						}
+
+						if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0;
+						usersPerStationCount[stationId] += 1;
+						if (!usersPerStation[stationId]) usersPerStation[stationId] = [];
+
+						return async.waterfall(
+							[
+								next => {
+									if (!socket.session || !socket.session.sessionId) return next("No session found.");
+									return CacheModule.runJob("HGET", {
+										table: "sessions",
+										key: socket.session.sessionId
+									})
+										.then(session => {
+											next(null, session);
+										})
+										.catch(next);
+								},
+
+								(session, next) => {
+									if (!session) return next("Session not found.");
+									return userModel.findOne({ _id: session.userId }, next);
+								},
+
+								(user, next) => {
+									if (!user) return next("User not found.");
+									if (usersPerStation[stationId].indexOf(user.username) !== -1)
+										return next("User already in the list.");
+									return next(null, { username: user.username, avatar: user.avatar });
+								}
+							],
+							(err, user) => {
+								if (!err) {
+									usersPerStation[stationId].push(user);
+								}
+								next();
+							}
+						);
+					});
+					// TODO Code to show users
+				},
+				() => {
+					Object.keys(usersPerStationCount).forEach(stationId => {
+						if (oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]) {
+							if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
+						}
+					});
+
+					// Object.keys(usersPerStation).forEach(stationId => {
+					// 	if (
+					// 		_.difference(usersPerStation[stationId], oldUsersPerStation[stationId]).length > 0 ||
+					// 		_.difference(oldUsersPerStation[stationId], usersPerStation[stationId]).length > 0
+					// 	) {
+					// 		if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(stationId);
+					// 	}
+					// });
+
+					StationsModule.usersPerStationCount = usersPerStationCount;
+					StationsModule.usersPerStation = usersPerStation;
+
+					stationsCountUpdated.forEach(stationId => {
+						console.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
+						CacheModule.runJob("PUB", {
+							table: "station.updateUserCount",
+							value: stationId
+						});
+					});
+
+					stationsUpdated.forEach(stationId => {
+						console.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
+						CacheModule.runJob("PUB", {
+							table: "station.updateUsers",
+							value: stationId
+						});
+					});
+
+					// console.log("Userlist", StationsModule.usersPerStation);
+				}
+			);
+
+			resolve();
+		});
+	}
 }
 
 export default new _TasksModule();