浏览代码

fix(socket.io -> WS): 'SOCKETS_FROM_USER' job executes correctly

Signed-off-by: Jonathan <theflametrooper@gmail.com>
Jonathan 4 年之前
父节点
当前提交
18acc0043e

+ 2 - 2
backend/logic/actions/activities.js

@@ -13,7 +13,7 @@ CacheModule.runJob("SUB", {
 	channel: "activity.removeAllForUser",
 	cb: userId => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId }, this).then(sockets =>
-			sockets.forEach(socket => socket.emit("event:activity.removeAllForUser"))
+			sockets.forEach(socket => socket.dispatch("event:activity.removeAllForUser"))
 		);
 
 		IOModule.runJob("EMIT_TO_ROOM", {
@@ -27,7 +27,7 @@ CacheModule.runJob("SUB", {
 	channel: "activity.hide",
 	cb: res => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: res.userId }, this).then(sockets =>
-			sockets.forEach(socket => socket.emit("event:activity.hide", res.activityId))
+			sockets.forEach(socket => socket.dispatch("event:activity.hide", res.activityId))
 		);
 
 		IOModule.runJob("EMIT_TO_ROOM", {

+ 7 - 7
backend/logic/actions/playlists.js

@@ -17,7 +17,7 @@ CacheModule.runJob("SUB", {
 	channel: "playlist.create",
 	cb: playlist => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: playlist.createdBy }, this).then(sockets => {
-			sockets.forEach(socket => socket.emit("event:playlist.create", playlist));
+			sockets.forEach(socket => socket.dispatch("event:playlist.create", playlist));
 		});
 
 		if (playlist.privacy === "public")
@@ -33,7 +33,7 @@ CacheModule.runJob("SUB", {
 	cb: res => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: res.userId }, this).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:playlist.delete", res.playlistId);
+				socket.dispatch("event:playlist.delete", res.playlistId);
 			});
 		});
 
@@ -49,7 +49,7 @@ CacheModule.runJob("SUB", {
 	cb: res => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: res.userId }, this).then(sockets =>
 			sockets.forEach(socket =>
-				socket.emit("event:playlist.repositionSongs", {
+				socket.dispatch("event:playlist.repositionSongs", {
 					playlistId: res.playlistId,
 					songsBeingChanged: res.songsBeingChanged
 				})
@@ -63,7 +63,7 @@ CacheModule.runJob("SUB", {
 	cb: res => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: res.userId }, this).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:playlist.addSong", {
+				socket.dispatch("event:playlist.addSong", {
 					playlistId: res.playlistId,
 					song: res.song
 				});
@@ -89,7 +89,7 @@ CacheModule.runJob("SUB", {
 	cb: res => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: res.userId }, this).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:playlist.removeSong", {
+				socket.dispatch("event:playlist.removeSong", {
 					playlistId: res.playlistId,
 					songId: res.songId
 				});
@@ -115,7 +115,7 @@ CacheModule.runJob("SUB", {
 	cb: res => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: res.userId }, this).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:playlist.updateDisplayName", {
+				socket.dispatch("event:playlist.updateDisplayName", {
 					playlistId: res.playlistId,
 					displayName: res.displayName
 				});
@@ -141,7 +141,7 @@ CacheModule.runJob("SUB", {
 	cb: res => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: res.userId }, this).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:playlist.updatePrivacy", {
+				socket.dispatch("event:playlist.updatePrivacy", {
 					playlist: res.playlist
 				});
 			});

+ 4 - 4
backend/logic/actions/songs.js

@@ -64,7 +64,7 @@ CacheModule.runJob("SUB", {
 		});
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: data.userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:song.newRatings", {
+				socket.dispatch("event:song.newRatings", {
 					songId: data.songId,
 					liked: true,
 					disliked: false
@@ -90,7 +90,7 @@ CacheModule.runJob("SUB", {
 		});
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: data.userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:song.newRatings", {
+				socket.dispatch("event:song.newRatings", {
 					songId: data.songId,
 					liked: false,
 					disliked: true
@@ -116,7 +116,7 @@ CacheModule.runJob("SUB", {
 		});
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: data.userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:song.newRatings", {
+				socket.dispatch("event:song.newRatings", {
 					songId: data.songId,
 					liked: false,
 					disliked: false
@@ -142,7 +142,7 @@ CacheModule.runJob("SUB", {
 		});
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: data.userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:song.newRatings", {
+				socket.dispatch("event:song.newRatings", {
 					songId: data.songId,
 					liked: false,
 					disliked: false

+ 13 - 13
backend/logic/actions/stations.js

@@ -59,9 +59,9 @@ CacheModule.runJob("SUB", {
 								}).then(userModel =>
 									userModel.findOne({ _id: session.userId }, (err, user) => {
 										if (user.role === "admin")
-											socket.emit("event:userCount.updated", stationId, count);
+											socket.dispatch("event:userCount.updated", stationId, count);
 										else if (station.type === "community" && station.owner === session.userId)
-											socket.emit("event:userCount.updated", stationId, count);
+											socket.dispatch("event:userCount.updated", stationId, count);
 									})
 								);
 						});
@@ -137,7 +137,7 @@ CacheModule.runJob("SUB", {
 			}).then(response => {
 				const { socketsThatCan } = response;
 				socketsThatCan.forEach(socket => {
-					socket.emit("event:station.pause", { stationId });
+					socket.dispatch("event:station.pause", { stationId });
 				});
 			});
 		});
@@ -160,7 +160,7 @@ CacheModule.runJob("SUB", {
 				.then(response => {
 					const { socketsThatCan } = response;
 					socketsThatCan.forEach(socket => {
-						socket.emit("event:station.resume", { stationId });
+						socket.dispatch("event:station.resume", { stationId });
 					});
 				})
 				.catch(console.log);
@@ -190,10 +190,10 @@ CacheModule.runJob("SUB", {
 					}).then(response => {
 						const { socketsThatCan, socketsThatCannot } = response;
 						socketsThatCan.forEach(socket => {
-							socket.emit("event:station.updatePrivacy", { stationId, privacy: station.privacy });
+							socket.dispatch("event:station.updatePrivacy", { stationId, privacy: station.privacy });
 						});
 						socketsThatCannot.forEach(socket => {
-							socket.emit("event:station.removed", { stationId });
+							socket.dispatch("event:station.removed", { stationId });
 						});
 					});
 				} else {
@@ -205,7 +205,7 @@ CacheModule.runJob("SUB", {
 					}).then(response => {
 						const { socketsThatCan } = response;
 						socketsThatCan.forEach(socket => {
-							socket.emit("event:station.updatePrivacy", { stationId, privacy: station.privacy });
+							socket.dispatch("event:station.updatePrivacy", { stationId, privacy: station.privacy });
 						});
 					});
 				}
@@ -225,7 +225,7 @@ CacheModule.runJob("SUB", {
 				station
 			}).then(response => {
 				const { socketsThatCan } = response;
-				socketsThatCan.forEach(socket => socket.emit("event:station.updateName", { stationId, name }));
+				socketsThatCan.forEach(socket => socket.dispatch("event:station.updateName", { stationId, name }));
 			})
 		);
 
@@ -248,7 +248,7 @@ CacheModule.runJob("SUB", {
 			}).then(response => {
 				const { socketsThatCan } = response;
 				socketsThatCan.forEach(socket =>
-					socket.emit("event:station.updateDisplayName", { stationId, displayName })
+					socket.dispatch("event:station.updateDisplayName", { stationId, displayName })
 				);
 			})
 		);
@@ -272,7 +272,7 @@ CacheModule.runJob("SUB", {
 			}).then(response => {
 				const { socketsThatCan } = response;
 				socketsThatCan.forEach(socket =>
-					socket.emit("event:station.updateDescription", { stationId, description })
+					socket.dispatch("event:station.updateDescription", { stationId, description })
 				);
 			})
 		);
@@ -299,7 +299,7 @@ CacheModule.runJob("SUB", {
 			}).then(response => {
 				const { socketsThatCan } = response;
 				socketsThatCan.forEach(socket => {
-					socket.emit("event:station.themeUpdated", { stationId, theme: station.theme });
+					socket.dispatch("event:station.themeUpdated", { stationId, theme: station.theme });
 				});
 			});
 		});
@@ -379,9 +379,9 @@ CacheModule.runJob("SUB", {
 						}).then(session => {
 							if (session) {
 								userModel.findOne({ _id: session.userId }, (err, user) => {
-									if (user.role === "admin") socket.emit("event:stations.created", station);
+									if (user.role === "admin") socket.dispatch("event:stations.created", station);
 									else if (station.type === "community" && station.owner === session.userId)
-										socket.emit("event:stations.created", station);
+										socket.dispatch("event:stations.created", station);
 								});
 							}
 						});

+ 11 - 11
backend/logic/actions/users.js

@@ -23,7 +23,7 @@ CacheModule.runJob("SUB", {
 	cb: res => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: res.userId }, this).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("keep.event:user.preferences.changed", res.preferences);
+				socket.dispatch("keep.event:user.preferences.changed", res.preferences);
 			});
 		});
 	}
@@ -34,7 +34,7 @@ CacheModule.runJob("SUB", {
 	cb: res => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: res.userId }, this).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:user.orderOfPlaylists.changed", res.orderOfPlaylists);
+				socket.dispatch("event:user.orderOfPlaylists.changed", res.orderOfPlaylists);
 			});
 		});
 
@@ -50,7 +50,7 @@ CacheModule.runJob("SUB", {
 	cb: user => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: user._id }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:user.username.changed", user.username);
+				socket.dispatch("event:user.username.changed", user.username);
 			});
 		});
 	}
@@ -61,7 +61,7 @@ CacheModule.runJob("SUB", {
 	cb: userId => {
 		IOModule.runJob("SOCKETS_FROM_USER_WITHOUT_CACHE", { userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("keep.event:user.session.removed");
+				socket.dispatch("keep.event:user.session.removed");
 			});
 		});
 	}
@@ -72,7 +72,7 @@ CacheModule.runJob("SUB", {
 	cb: userId => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:user.linkPassword");
+				socket.dispatch("event:user.linkPassword");
 			});
 		});
 	}
@@ -83,7 +83,7 @@ CacheModule.runJob("SUB", {
 	cb: userId => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:user.unlinkPassword");
+				socket.dispatch("event:user.unlinkPassword");
 			});
 		});
 	}
@@ -94,7 +94,7 @@ CacheModule.runJob("SUB", {
 	cb: userId => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:user.linkGithub");
+				socket.dispatch("event:user.linkGithub");
 			});
 		});
 	}
@@ -105,7 +105,7 @@ CacheModule.runJob("SUB", {
 	cb: userId => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:user.unlinkGithub");
+				socket.dispatch("event:user.unlinkGithub");
 			});
 		});
 	}
@@ -116,7 +116,7 @@ CacheModule.runJob("SUB", {
 	cb: data => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: data.userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("keep.event:banned", data.punishment);
+				socket.dispatch("keep.event:banned", data.punishment);
 				socket.disconnect(true);
 			});
 		});
@@ -128,7 +128,7 @@ CacheModule.runJob("SUB", {
 	cb: data => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: data.userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:user.favoritedStation", data.stationId);
+				socket.dispatch("event:user.favoritedStation", data.stationId);
 			});
 		});
 	}
@@ -139,7 +139,7 @@ CacheModule.runJob("SUB", {
 	cb: data => {
 		IOModule.runJob("SOCKETS_FROM_USER", { userId: data.userId }).then(sockets => {
 			sockets.forEach(socket => {
-				socket.emit("event:user.unfavoritedStation", data.stationId);
+				socket.dispatch("event:user.unfavoritedStation", data.stationId);
 			});
 		});
 	}

+ 3 - 3
backend/logic/activities.js

@@ -74,7 +74,7 @@ class _ActivitiesModule extends CoreClass {
 					(activity, next) => {
 						IOModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId }, this)
 							.then(sockets => {
-								sockets.forEach(socket => socket.emit("event:activity.create", activity));
+								sockets.forEach(socket => socket.dispatch("event:activity.create", activity));
 								next(null, activity);
 							})
 							.catch(next);
@@ -227,7 +227,7 @@ class _ActivitiesModule extends CoreClass {
 
 							IOModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
 								.then(sockets =>
-									sockets.forEach(socket => socket.emit("event:activity.hide", activity._id))
+									sockets.forEach(socket => socket.dispatch("event:activity.hide", activity._id))
 								)
 								.catch(next);
 
@@ -332,7 +332,7 @@ class _ActivitiesModule extends CoreClass {
 
 							IOModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
 								.then(sockets =>
-									sockets.forEach(socket => socket.emit("event:activity.hide", activity._id))
+									sockets.forEach(socket => socket.dispatch("event:activity.hide", activity._id))
 								)
 								.catch(next);
 

+ 6 - 27
backend/logic/io.js

@@ -152,20 +152,15 @@ class _IOModule extends CoreClass {
 			const sockets = [];
 
 			return async.eachLimit(
-				Object.keys(IOModule._io.clients),
+				IOModule._io.clients,
 				1,
-				(id, next) => {
-					const { session } = IOModule._io.clients[id];
-
-					console.log(1, session);
+				(socket, next) => {
+					const { sessionId } = socket.session;
 
-					if (session.sessionId) {
-						return CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }, this)
+					if (sessionId) {
+						return CacheModule.runJob("HGET", { table: "sessions", key: sessionId }, this)
 							.then(session => {
-								console.log(2, session, payload.userId);
-
-								if (session && session.userId === payload.userId)
-									sockets.push(IOModule._io.clients[id]);
+								if (session && session.userId === payload.userId) sockets.push(socket);
 								next();
 							})
 							.catch(err => next(err));
@@ -174,8 +169,6 @@ class _IOModule extends CoreClass {
 					return next();
 				},
 				err => {
-					console.log("SOCKETS_FROM_USER", sockets, err);
-
 					if (err) return reject(err);
 					return resolve(sockets);
 				}
@@ -293,21 +286,7 @@ class _IOModule extends CoreClass {
 	 * @returns {Promise} - returns promise (reject, resolve)
 	 */
 	async EMIT_TO_ROOM(payload) {
-		console.log("EMIT_TO_ROOM", payload.room);
-
 		return new Promise(resolve => {
-			// IOModule._io.clients.forEach(socket => {
-			// 	console.log(1234);
-			// 	console.log(socket, socket.rooms);
-
-			// 	if (socket.rooms[payload.room]) {
-			// 		console.log(payload.args);
-			// 		socket.dispatch(...payload.args);
-			// 	}
-			// });
-
-			console.log(IOModule.rooms, payload.room);
-
 			if (IOModule.rooms[payload.room])
 				return IOModule.rooms[payload.room].forEach(async socketId => {
 					const socket = await IOModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);

+ 2 - 2
backend/logic/stations.js

@@ -953,7 +953,7 @@ class _StationsModule extends CoreClass {
 														(err, user) => {
 															if (!err && user) {
 																if (user.role === "admin")
-																	socket.emit(
+																	socket.dispatch(
 																		"event:station.nextSong",
 																		station._id,
 																		station.currentSong
@@ -962,7 +962,7 @@ class _StationsModule extends CoreClass {
 																	station.type === "community" &&
 																	station.owner === session.userId
 																)
-																	socket.emit(
+																	socket.dispatch(
 																		"event:station.nextSong",
 																		station._id,
 																		station.currentSong