123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- import config from "config";
- import express from "express";
- import http, { Server, IncomingMessage } from "node:http";
- import { RawData, WebSocketServer } from "ws";
- import { Types, isObjectIdOrHexString } from "mongoose";
- import BaseModule from "@/BaseModule";
- import { UniqueMethods } from "@/types/Modules";
- import WebSocket from "@/WebSocket";
- import ModuleManager from "@/ModuleManager";
- import JobQueue from "@/JobQueue";
- import DataModule from "./DataModule";
- export class WebSocketModule extends BaseModule {
- private _httpServer?: Server;
- private _wsServer?: WebSocketServer;
- private _keepAliveInterval?: NodeJS.Timer;
- /**
- * WebSocket Module
- */
- public constructor() {
- super("websocket");
- }
- /**
- * startup - Startup websocket module
- */
- public override async startup() {
- await super.startup();
- this._httpServer = http
- .createServer(express())
- .listen(config.get("port"));
- this._wsServer = new WebSocketServer({
- server: this._httpServer,
- path: "/ws",
- WebSocket
- });
- this._wsServer.on(
- "connection",
- (socket: WebSocket, request: IncomingMessage) =>
- this._handleConnection(socket, request)
- );
- this._keepAliveInterval = setInterval(() => this._keepAlive(), 45000);
- this._wsServer.on("close", async () =>
- clearInterval(this._keepAliveInterval)
- );
- await super._started();
- }
- /**
- * keepAlive - Ping open clients and terminate closed
- */
- private async _keepAlive() {
- if (!this._wsServer) return;
- for await (const clients of this._wsServer.clients.entries()) {
- await Promise.all(
- clients.map(async socket => {
- switch (socket.readyState) {
- case socket.OPEN:
- socket.ping();
- break;
- case socket.CLOSED:
- socket.terminate();
- break;
- default:
- break;
- }
- })
- );
- }
- }
- /**
- * handleConnection - Handle websocket connection
- */
- private async _handleConnection(
- socket: WebSocket,
- request: IncomingMessage
- ) {
- if (JobQueue.getStatus().isPaused) {
- socket.close();
- return;
- }
- socket.setSocketId(request.headers["sec-websocket-key"]);
- let sessionId;
- let user;
- if (request.headers.cookie) {
- sessionId = request.headers.cookie
- .split("; ")
- .find(
- cookie =>
- cookie.substring(0, cookie.indexOf("=")) ===
- config.get<string>("cookie")
- );
- sessionId = sessionId?.substring(
- sessionId.indexOf("=") + 1,
- sessionId.length
- );
- }
- if (sessionId && isObjectIdOrHexString(sessionId)) {
- socket.setSessionId(sessionId);
- const Session = await DataModule.getModel("sessions");
- const session = await Session.findByIdAndUpdate(sessionId, {
- updatedAt: Date.now()
- });
- if (session) {
- const User = await DataModule.getModel("users");
- user = await User.findById(session.userId);
- }
- }
- const readyData = {
- config: {
- cookie: config.get("cookie"),
- sitename: config.get("sitename"),
- recaptcha: {
- enabled: config.get("apis.recaptcha.enabled"),
- key: config.get("apis.recaptcha.key")
- },
- githubAuthentication: config.get("apis.github.enabled"),
- messages: config.get("messages"),
- christmas: config.get("christmas"),
- footerLinks: config.get("footerLinks"),
- shortcutOverrides: config.get("shortcutOverrides"),
- registrationDisabled: config.get("registrationDisabled"),
- mailEnabled: config.get("mail.enabled"),
- discogsEnabled: config.get("apis.discogs.enabled"),
- experimental: {
- changable_listen_mode: config.get(
- "experimental.changable_listen_mode"
- ),
- media_session: config.get("experimental.media_session"),
- disable_youtube_search: config.get(
- "experimental.disable_youtube_search"
- ),
- station_history: config.get("experimental.station_history"),
- soundcloud: config.get("experimental.soundcloud"),
- spotify: config.get("experimental.spotify")
- }
- },
- user
- };
- socket.log({
- type: "debug",
- message: `WebSocket opened #${socket.getSocketId()}`
- });
- socket.on("error", error =>
- socket.log({
- type: "error",
- message: error.message,
- data: { error }
- })
- );
- socket.on("close", async () => {
- await JobQueue.runJob(
- "events",
- "unsubscribeAll",
- {},
- {
- socketId: socket.getSocketId()
- }
- );
- socket.log({
- type: "debug",
- message: `WebSocket closed #${socket.getSocketId()}`
- });
- });
- socket.dispatch("ready", readyData);
- socket.on("message", message => this._handleMessage(socket, message));
- }
- /**
- * handleMessage - Handle websocket message
- */
- private async _handleMessage(socket: WebSocket, message: RawData) {
- if (JobQueue.getStatus().isPaused) {
- socket.close();
- return;
- }
- let callbackRef;
- try {
- const data = JSON.parse(message.toString());
- if (!Array.isArray(data) || data.length < 1)
- throw new Error("Invalid request");
- const [moduleJob, payload, options] = data;
- const [moduleName, ...jobNameParts] = moduleJob.split(".");
- const jobName = jobNameParts.join(".");
- const { callbackRef } = options ?? payload ?? {};
- if (!callbackRef)
- throw new Error(
- `No callback reference provided for job ${moduleJob}`
- );
- const module = ModuleManager.getModule(moduleName);
- if (!module) throw new Error(`Module "${moduleName}" not found`);
- const Job = module.getJob(jobName);
- if (!Job?.isApiEnabled())
- throw new Error(`Job "${jobName}" not found.`);
- let session;
- if (socket.getSessionId()) {
- const Session = await DataModule.getModel("sessions");
- session = await Session.findByIdAndUpdate(
- socket.getSessionId(),
- {
- updatedAt: Date.now()
- }
- );
- }
- await JobQueue.queueJob(
- moduleName,
- jobName,
- payload,
- {},
- {
- session,
- socketId: socket.getSocketId(),
- callbackRef
- }
- );
- } catch (error) {
- const message = error?.message ?? error;
- if (callbackRef)
- socket.dispatch("jobCallback", callbackRef, {
- status: "error",
- message
- });
- else socket.dispatch("error", message);
- }
- }
- /**
- * getSockets - Get websocket clients
- */
- public async getSockets() {
- return this._wsServer?.clients;
- }
- /**
- * getSocket - Get websocket client
- */
- public async getSocket(socketId?: string, sessionId?: Types.ObjectId) {
- if (!this._wsServer) return null;
- for (const clients of this._wsServer.clients.entries() as IterableIterator<
- [WebSocket, WebSocket]
- >) {
- const socket = clients.find(socket => {
- if (socket.getSocketId() === socketId) return true;
- if (socket.getSessionId() === sessionId) return true;
- return false;
- });
- if (socket) return socket;
- }
- return null;
- }
- /**
- * dispatch - Dispatch message to socket
- */
- public async dispatch(socketId: string, channel: string, ...values) {
- const socket = await this.getSocket(socketId);
- if (!socket) return;
- socket.dispatch(channel, ...values);
- }
- /**
- * shutdown - Shutdown websocket module
- */
- public override async shutdown() {
- await super.shutdown();
- if (this._httpServer) this._httpServer.close();
- if (this._wsServer) this._wsServer.close();
- await this._stopped();
- }
- }
- export type WebSocketModuleJobs = {
- [Property in keyof UniqueMethods<WebSocketModule>]: {
- payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
- returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
- };
- };
- export default new WebSocketModule();
|