From 4cff230256cde114efe1eb97da08e854676d1312 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20N=C3=A9grier?= Date: Wed, 1 Dec 2021 10:12:07 +0100 Subject: [PATCH] Creating only one WS connection to pusher from admin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Also: migration to Typescript 4.5 and µWebsockets 1.20.4 --- pusher/package.json | 6 +- pusher/src/Controller/AdminController.ts | 4 +- pusher/src/Controller/InvalidTokenError.ts | 9 + pusher/src/Controller/IoSocketController.ts | 214 ++++++++++++------ .../Model/Websocket/Admin/AdminMessages.ts | 30 +++ pusher/src/Services/JWTTokenManager.ts | 8 +- pusher/src/Services/SocketManager.ts | 6 + pusher/yarn.lock | 16 +- 8 files changed, 206 insertions(+), 87 deletions(-) create mode 100644 pusher/src/Controller/InvalidTokenError.ts create mode 100644 pusher/src/Model/Websocket/Admin/AdminMessages.ts diff --git a/pusher/package.json b/pusher/package.json index dacbfd72..bbef80fa 100644 --- a/pusher/package.json +++ b/pusher/package.json @@ -52,7 +52,7 @@ "openid-client": "^4.7.4", "prom-client": "^12.0.0", "query-string": "^6.13.3", - "uWebSockets.js": "uNetworking/uWebSockets.js#v18.5.0", + "uWebSockets.js": "uNetworking/uWebSockets.js#v20.4.0", "uuidv4": "^6.0.7" }, "devDependencies": { @@ -71,8 +71,8 @@ "jasmine": "^3.5.0", "lint-staged": "^11.0.0", "prettier": "^2.3.1", - "ts-node-dev": "^1.0.0-pre.44", - "typescript": "^3.8.3" + "ts-node-dev": "^1.1.8", + "typescript": "^4.5.2" }, "lint-staged": { "*.ts": [ diff --git a/pusher/src/Controller/AdminController.ts b/pusher/src/Controller/AdminController.ts index ec1bd067..bf514fa0 100644 --- a/pusher/src/Controller/AdminController.ts +++ b/pusher/src/Controller/AdminController.ts @@ -44,7 +44,7 @@ export class AdminController extends BaseController { const roomId: string = body.roomId; await apiClientRepository.getClient(roomId).then((roomClient) => { - return new Promise((res, rej) => { + return new Promise((res, rej) => { const roomMessage = new RefreshRoomPromptMessage(); roomMessage.setRoomid(roomId); @@ -101,7 +101,7 @@ export class AdminController extends BaseController { await Promise.all( targets.map((roomId) => { return apiClientRepository.getClient(roomId).then((roomClient) => { - return new Promise((res, rej) => { + return new Promise((res, rej) => { if (type === "message") { const roomMessage = new AdminRoomMessage(); roomMessage.setMessage(text); diff --git a/pusher/src/Controller/InvalidTokenError.ts b/pusher/src/Controller/InvalidTokenError.ts new file mode 100644 index 00000000..2d0c066d --- /dev/null +++ b/pusher/src/Controller/InvalidTokenError.ts @@ -0,0 +1,9 @@ +/** + * Errors related to variable handling. + */ +export class InvalidTokenError extends Error { + constructor(message: string) { + super(message); + Object.setPrototypeOf(this, InvalidTokenError.prototype); + } +} diff --git a/pusher/src/Controller/IoSocketController.ts b/pusher/src/Controller/IoSocketController.ts index 35fd08d5..4c09638a 100644 --- a/pusher/src/Controller/IoSocketController.ts +++ b/pusher/src/Controller/IoSocketController.ts @@ -22,14 +22,17 @@ import { import { UserMovesMessage } from "../Messages/generated/messages_pb"; import { TemplatedApp } from "uWebSockets.js"; import { parse } from "query-string"; -import { jwtTokenManager, tokenInvalidException } from "../Services/JWTTokenManager"; +import { AdminSocketTokenData, jwtTokenManager, tokenInvalidException } from "../Services/JWTTokenManager"; import { adminApi, FetchMemberDataByUuidResponse } from "../Services/AdminApi"; import { SocketManager, socketManager } from "../Services/SocketManager"; import { emitInBatch } from "../Services/IoSocketHelpers"; -import { ADMIN_SOCKETS_TOKEN, ADMIN_API_URL, DISABLE_ANONYMOUS, SOCKET_IDLE_TIMER } from "../Enum/EnvironmentVariable"; +import { ADMIN_API_URL, DISABLE_ANONYMOUS, SOCKET_IDLE_TIMER } from "../Enum/EnvironmentVariable"; import { Zone } from "_Model/Zone"; import { ExAdminSocketInterface } from "_Model/Websocket/ExAdminSocketInterface"; import { CharacterTexture } from "../Services/AdminApi/CharacterTexture"; +import { isAdminMessageInterface } from "../Model/Websocket/Admin/AdminMessages"; +import Axios from "axios"; +import { InvalidTokenError } from "../Controller/InvalidTokenError"; export class IoSocketController { private nextUserId: number = 1; @@ -42,59 +45,108 @@ export class IoSocketController { adminRoomSocket() { this.app.ws("/admin/rooms", { upgrade: (res, req, context) => { - const query = parse(req.getQuery()); const websocketKey = req.getHeader("sec-websocket-key"); const websocketProtocol = req.getHeader("sec-websocket-protocol"); const websocketExtensions = req.getHeader("sec-websocket-extensions"); - const token = query.token; - let authorizedRoomIds: string[]; - try { - const data = jwtTokenManager.verifyAdminSocketToken(token as string); - authorizedRoomIds = data.authorizedRoomIds; - } catch (e) { - console.error("Admin access refused for token: " + token); - res.writeStatus("401 Unauthorized").end("Incorrect token"); - return; - } - const roomId = query.roomId; - if (typeof roomId !== "string" || !authorizedRoomIds.includes(roomId)) { - console.error("Invalid room id"); - res.writeStatus("403 Bad Request").end("Invalid room id"); - return; - } - res.upgrade({ roomId }, websocketKey, websocketProtocol, websocketExtensions, context); + res.upgrade({}, websocketKey, websocketProtocol, websocketExtensions, context); }, open: (ws) => { - console.log("Admin socket connect for room: " + ws.roomId); + console.log("Admin socket connect to client on " + Buffer.from(ws.getRemoteAddressAsText()).toString()); ws.disconnecting = false; - - socketManager.handleAdminRoom(ws as ExAdminSocketInterface, ws.roomId as string); }, message: (ws, arrayBuffer, isBinary): void => { try { - //TODO refactor message type and data - const message: { event: string; message: { type: string; message: unknown; userUuid: string } } = - JSON.parse(new TextDecoder("utf-8").decode(new Uint8Array(arrayBuffer))); + const message = JSON.parse(new TextDecoder("utf-8").decode(new Uint8Array(arrayBuffer))); - if (message.event === "user-message") { - const messageToEmit = message.message as { message: string; type: string; userUuid: string }; - if (messageToEmit.type === "banned") { - socketManager.emitBan( - messageToEmit.userUuid, - messageToEmit.message, - messageToEmit.type, - ws.roomId as string + if (!isAdminMessageInterface(message)) { + console.error("Invalid message received.", message); + ws.send( + JSON.stringify({ + type: "Error", + data: { + message: "Invalid message received! The connection has been closed.", + }, + }) + ); + ws.close(); + return; + } + + const token = message.jwt; + + let data: AdminSocketTokenData; + + try { + data = jwtTokenManager.verifyAdminSocketToken(token); + } catch (e) { + console.error("Admin socket access refused for token: " + token, e); + ws.send( + JSON.stringify({ + type: "Error", + data: { + message: "Admin socket access refused! The connection has been closed.", + }, + }) + ); + ws.close(); + return; + } + + const authorizedRoomIds = data.authorizedRoomIds; + + if (message.event === "listen") { + const notAuthorizedRoom = message.roomIds.filter( + (roomId) => !authorizedRoomIds.includes(roomId) + ); + + if (notAuthorizedRoom.length > 0) { + const errorMessage = `Admin socket refused for client on ${Buffer.from( + ws.getRemoteAddressAsText() + ).toString()} listening of : \n${JSON.stringify(notAuthorizedRoom)}`; + console.error(); + ws.send( + JSON.stringify({ + type: "Error", + data: { + message: errorMessage, + }, + }) ); + ws.close(); + return; } - if (messageToEmit.type === "ban") { - socketManager.emitSendUserMessage( - messageToEmit.userUuid, - messageToEmit.message, - messageToEmit.type, - ws.roomId as string - ); + + for (const roomId of message.roomIds) { + socketManager + .handleAdminRoom(ws as ExAdminSocketInterface, roomId) + .catch((e) => console.error(e)); } + } else if (message.event === "user-message") { + const messageToEmit = message.message; + // Get roomIds of the world where we want broadcast the message + const roomIds = authorizedRoomIds.filter( + (authorizeRoomId) => authorizeRoomId.split("/")[5] === message.world + ); + + for (const roomId of roomIds) { + if (messageToEmit.type === "banned") { + socketManager + .emitBan(messageToEmit.userUuid, messageToEmit.message, messageToEmit.type, roomId) + .catch((error) => console.error(error)); + } else if (messageToEmit.type === "ban") { + socketManager + .emitSendUserMessage( + messageToEmit.userUuid, + messageToEmit.message, + messageToEmit.type, + roomId + ) + .catch((error) => console.error(error)); + } + } + } else { + const tmp: never = message.event; } } catch (err) { console.error(err); @@ -202,28 +254,30 @@ export class IoSocketController { try { userData = await adminApi.fetchMemberDataByUuid(userIdentifier, roomId, IPAddress); } catch (err) { - if (err?.response?.status == 404) { - // If we get an HTTP 404, the token is invalid. Let's perform an anonymous login! + if (Axios.isAxiosError(err)) { + if (err?.response?.status == 404) { + // If we get an HTTP 404, the token is invalid. Let's perform an anonymous login! - console.warn( - 'Cannot find user with email "' + - (userIdentifier || "anonymous") + - '". Performing an anonymous login instead.' - ); - } else if (err?.response?.status == 403) { - // If we get an HTTP 403, the world is full. We need to broadcast a special error to the client. - // we finish immediately the upgrade then we will close the socket as soon as it starts opening. - return res.upgrade( - { - rejected: true, - message: err?.response?.data.message, - status: err?.response?.status, - }, - websocketKey, - websocketProtocol, - websocketExtensions, - context - ); + console.warn( + 'Cannot find user with email "' + + (userIdentifier || "anonymous") + + '". Performing an anonymous login instead.' + ); + } else if (err?.response?.status == 403) { + // If we get an HTTP 403, the world is full. We need to broadcast a special error to the client. + // we finish immediately the upgrade then we will close the socket as soon as it starts opening. + return res.upgrade( + { + rejected: true, + message: err?.response?.data.message, + status: err?.response?.status, + }, + websocketKey, + websocketProtocol, + websocketExtensions, + context + ); + } } else { throw err; } @@ -302,17 +356,31 @@ export class IoSocketController { context ); } catch (e) { - res.upgrade( - { - rejected: true, - reason: e.reason || null, - message: e.message ? e.message : "500 Internal Server Error", - }, - websocketKey, - websocketProtocol, - websocketExtensions, - context - ); + if (e instanceof Error) { + res.upgrade( + { + rejected: true, + reason: e instanceof InvalidTokenError ? tokenInvalidException : null, + message: e.message, + }, + websocketKey, + websocketProtocol, + websocketExtensions, + context + ); + } else { + res.upgrade( + { + rejected: true, + reason: null, + message: "500 Internal Server Error", + }, + websocketKey, + websocketProtocol, + websocketExtensions, + context + ); + } } })(); }, diff --git a/pusher/src/Model/Websocket/Admin/AdminMessages.ts b/pusher/src/Model/Websocket/Admin/AdminMessages.ts new file mode 100644 index 00000000..1d64899a --- /dev/null +++ b/pusher/src/Model/Websocket/Admin/AdminMessages.ts @@ -0,0 +1,30 @@ +import * as tg from "generic-type-guard"; + +export const isBanBannedAdminMessageInterface = new tg.IsInterface() + .withProperties({ + type: tg.isSingletonStringUnion("ban", "banned"), + message: tg.isString, + userUuid: tg.isString, + }) + .get(); + +export const isUserMessageAdminMessageInterface = new tg.IsInterface() + .withProperties({ + event: tg.isSingletonString("user-message"), + message: isBanBannedAdminMessageInterface, + world: tg.isString, + jwt: tg.isString, + }) + .get(); + +export const isListenRoomsMessageInterface = new tg.IsInterface() + .withProperties({ + event: tg.isSingletonString("listen"), + roomIds: tg.isArray(tg.isString), + jwt: tg.isString, + }) + .get(); + +export const isAdminMessageInterface = tg.isUnion(isUserMessageAdminMessageInterface, isListenRoomsMessageInterface); + +export type AdminMessageInterface = tg.GuardedType; diff --git a/pusher/src/Services/JWTTokenManager.ts b/pusher/src/Services/JWTTokenManager.ts index 40b5b824..ddd5b715 100644 --- a/pusher/src/Services/JWTTokenManager.ts +++ b/pusher/src/Services/JWTTokenManager.ts @@ -3,6 +3,7 @@ import { uuid } from "uuidv4"; import Jwt, { verify } from "jsonwebtoken"; import { TokenInterface } from "../Controller/AuthenticateController"; import { adminApi, AdminBannedData } from "../Services/AdminApi"; +import { InvalidTokenError } from "../Controller/InvalidTokenError"; export interface AuthTokenData { identifier: string; //will be a email if logged in or an uuid if anonymous @@ -26,7 +27,12 @@ class JWTTokenManager { try { return Jwt.verify(token, SECRET_KEY, { ignoreExpiration }) as AuthTokenData; } catch (e) { - throw { reason: tokenInvalidException, message: e.message }; + if (e instanceof Error) { + // FIXME: we are loosing the stacktrace here. + throw new InvalidTokenError(e.message); + } else { + throw e; + } } } } diff --git a/pusher/src/Services/SocketManager.ts b/pusher/src/Services/SocketManager.ts index 1761f1bd..083840e4 100644 --- a/pusher/src/Services/SocketManager.ts +++ b/pusher/src/Services/SocketManager.ts @@ -132,6 +132,12 @@ export class SocketManager implements ZoneEventListener { const message = new AdminPusherToBackMessage(); message.setSubscribetoroom(roomId); + console.log( + `Admin socket handle room ${roomId} connections for a client on ${Buffer.from( + client.getRemoteAddressAsText() + ).toString()}` + ); + adminRoomStream.write(message); } diff --git a/pusher/yarn.lock b/pusher/yarn.lock index 820575aa..ead3a9cc 100644 --- a/pusher/yarn.lock +++ b/pusher/yarn.lock @@ -2270,7 +2270,7 @@ tree-kill@^1.2.2: resolved "https://registry.yarnpkg.com/tree-kill/-/tree-kill-1.2.2.tgz#4ca09a9092c88b73a7cdc5e8a01b507b0790a0cc" integrity sha512-L0Orpi8qGpRG//Nd+H90vFB+3iHnue1zSSGmNOOCh1GLJ7rUKVwV2HvijphGQS2UmhUZewS9VgvxYIdgr+fG1A== -ts-node-dev@^1.0.0-pre.44: +ts-node-dev@^1.1.8: version "1.1.8" resolved "https://registry.yarnpkg.com/ts-node-dev/-/ts-node-dev-1.1.8.tgz#95520d8ab9d45fffa854d6668e2f8f9286241066" integrity sha512-Q/m3vEwzYwLZKmV6/0VlFxcZzVV/xcgOt+Tx/VjaaRHyiBcFlV0541yrT09QjzzCxlDZ34OzKjrFAynlmtflEg== @@ -2337,14 +2337,14 @@ type-fest@^0.8.1: resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.8.1.tgz#09e249ebde851d3b1e48d27c105444667f17b83d" integrity sha512-4dbzIzqvjtgiM5rw1k5rEHtBANKmdudhGyBEajN01fEyhaAIhsoKNy6y7+IN93IfpFtwY9iqi7kD+xwKhQsNJA== -typescript@^3.8.3: - version "3.9.10" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.9.10.tgz#70f3910ac7a51ed6bef79da7800690b19bf778b8" - integrity sha512-w6fIxVE/H1PkLKcCPsFqKE7Kv7QUwhU8qQY2MueZXWx5cPZdwFupLgKK3vntcK98BtNHZtAF4LA/yl2a7k8R6Q== +typescript@^4.5.2: + version "4.5.2" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.5.2.tgz#8ac1fba9f52256fdb06fb89e4122fa6a346c2998" + integrity sha512-5BlMof9H1yGt0P8/WF+wPNw6GfctgGjXp5hkblpyT+8rkASSmkUKMXrxR0Xg8ThVCi/JnHQiKXeBaEwCeQwMFw== -uWebSockets.js@uNetworking/uWebSockets.js#v18.5.0: - version "18.5.0" - resolved "https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/9b1605d2db82981cafe69dbe356e10ce412f5805" +uWebSockets.js@uNetworking/uWebSockets.js#v20.4.0: + version "20.4.0" + resolved "https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/65f39bdff763be3883e6cf18e433dd4fec155845" uri-js@^4.2.2: version "4.4.1"