diff --git a/back/src/Model/GameRoom.ts b/back/src/Model/GameRoom.ts index 020f4c29..33af483f 100644 --- a/back/src/Model/GameRoom.ts +++ b/back/src/Model/GameRoom.ts @@ -34,7 +34,8 @@ export class GameRoom { private readonly connectCallback: ConnectCallback; private readonly disconnectCallback: DisconnectCallback; - private itemsState: Map = new Map(); + private itemsState = new Map(); + private variables = new Map(); private readonly positionNotifier: PositionNotifier; public readonly roomId: string; @@ -309,6 +310,10 @@ export class GameRoom { return this.itemsState; } + public setVariable(name: string, value: string): void { + this.variables.set(name, value); + } + public addZoneListener(call: ZoneSocket, x: number, y: number): Set { return this.positionNotifier.addZoneListener(call, x, y); } diff --git a/back/src/RoomManager.ts b/back/src/RoomManager.ts index 9aaf1edb..2514c576 100644 --- a/back/src/RoomManager.ts +++ b/back/src/RoomManager.ts @@ -16,7 +16,7 @@ import { ServerToAdminClientMessage, ServerToClientMessage, SilentMessage, - UserMovesMessage, + UserMovesMessage, VariableMessage, WebRtcSignalToServerMessage, WorldFullWarningToRoomMessage, ZoneMessage, @@ -72,6 +72,8 @@ const roomManager: IRoomManagerServer = { socketManager.handleSilentMessage(room, user, message.getSilentmessage() as SilentMessage); } else if (message.hasItemeventmessage()) { socketManager.handleItemEvent(room, user, message.getItemeventmessage() as ItemEventMessage); + } else if (message.hasVariablemessage()) { + socketManager.handleVariableEvent(room, user, message.getVariablemessage() as VariableMessage); } else if (message.hasWebrtcsignaltoservermessage()) { socketManager.emitVideo( room, diff --git a/back/src/Services/SocketManager.ts b/back/src/Services/SocketManager.ts index e61763cd..e8356245 100644 --- a/back/src/Services/SocketManager.ts +++ b/back/src/Services/SocketManager.ts @@ -29,7 +29,7 @@ import { EmoteEventMessage, BanUserMessage, RefreshRoomMessage, - EmotePromptMessage, + EmotePromptMessage, VariableMessage, } from "../Messages/generated/messages_pb"; import { User, UserSocket } from "../Model/User"; import { ProtobufUtils } from "../Model/Websocket/ProtobufUtils"; @@ -184,6 +184,28 @@ export class SocketManager { } } + handleVariableEvent(room: GameRoom, user: User, variableMessage: VariableMessage) { + const itemEvent = ProtobufUtils.toItemEvent(itemEventMessage); + + try { + // TODO: DISPATCH ON NEW ROOM CHANNEL + + const subMessage = new SubMessage(); + subMessage.setItemeventmessage(itemEventMessage); + + // Let's send the event without using the SocketIO room. + // TODO: move this in the GameRoom class. + for (const user of room.getUsers().values()) { + user.emitInBatch(subMessage); + } + + room.setVariable(variableMessage.getName(), variableMessage.getValue()); + } catch (e) { + console.error('An error occurred on "item_event"'); + console.error(e); + } + } + emitVideo(room: GameRoom, user: User, data: WebRtcSignalToServerMessage): void { //send only at user const remoteUser = room.getUsers().get(data.getReceiverid()); @@ -425,6 +447,7 @@ export class SocketManager { // Let's send 2 messages: one to the user joining the group and one to the other user const webrtcStartMessage1 = new WebRtcStartMessage(); webrtcStartMessage1.setUserid(otherUser.id); + webrtcStartMessage1.setUseruuid(otherUser.uuid); webrtcStartMessage1.setName(otherUser.name); webrtcStartMessage1.setInitiator(true); if (TURN_STATIC_AUTH_SECRET !== "") { @@ -443,6 +466,7 @@ export class SocketManager { const webrtcStartMessage2 = new WebRtcStartMessage(); webrtcStartMessage2.setUserid(user.id); + webrtcStartMessage2.setUseruuid(user.uuid); webrtcStartMessage2.setName(user.name); webrtcStartMessage2.setInitiator(false); if (TURN_STATIC_AUTH_SECRET !== "") { diff --git a/messages/protos/messages.proto b/messages/protos/messages.proto index 30882cd9..289c0724 100644 --- a/messages/protos/messages.proto +++ b/messages/protos/messages.proto @@ -325,6 +325,10 @@ message ZoneMessage { int32 y = 3; } +message RoomMessage { + string roomId = 1; +} + message PusherToBackMessage { oneof message { JoinRoomMessage joinRoomMessage = 1; @@ -360,10 +364,20 @@ message SubToPusherMessage { SendUserMessage sendUserMessage = 7; BanUserMessage banUserMessage = 8; EmoteEventMessage emoteEventMessage = 9; - VariableMessage variableMessage = 10; } } +message BatchToPusherRoomMessage { + repeated SubToPusherRoomMessage payload = 2; +} + +message SubToPusherRoomMessage { + oneof message { + VariableMessage variableMessage = 1; + } +} + + /*message BatchToAdminPusherMessage { repeated SubToAdminPusherMessage payload = 2; }*/ @@ -433,9 +447,13 @@ message EmptyMessage { } +/** + * Service handled by the "back". Pusher servers connect to this service. + */ service RoomManager { - rpc joinRoom(stream PusherToBackMessage) returns (stream ServerToClientMessage); - rpc listenZone(ZoneMessage) returns (stream BatchToPusherMessage); + rpc joinRoom(stream PusherToBackMessage) returns (stream ServerToClientMessage); // Holds a connection between one given client and the back + rpc listenZone(ZoneMessage) returns (stream BatchToPusherMessage); // Connection used to send to a pusher messages related to a given zone of a given room + rpc listenRoom(RoomMessage) returns (stream BatchToPusherRoomMessage); // Connection used to send to a pusher messages related to a given room rpc adminRoom(stream AdminPusherToBackMessage) returns (stream ServerToAdminClientMessage); rpc sendAdminMessage(AdminMessage) returns (EmptyMessage); rpc sendGlobalAdminMessage(AdminGlobalMessage) returns (EmptyMessage); diff --git a/pusher/src/Controller/IoSocketController.ts b/pusher/src/Controller/IoSocketController.ts index 1af9d917..a6fddb34 100644 --- a/pusher/src/Controller/IoSocketController.ts +++ b/pusher/src/Controller/IoSocketController.ts @@ -16,7 +16,7 @@ import { SendUserMessage, ServerToClientMessage, CompanionMessage, - EmotePromptMessage, + EmotePromptMessage, VariableMessage, } from "../Messages/generated/messages_pb"; import { UserMovesMessage } from "../Messages/generated/messages_pb"; import { TemplatedApp } from "uWebSockets.js"; @@ -358,6 +358,8 @@ export class IoSocketController { socketManager.handleSilentMessage(client, message.getSilentmessage() as SilentMessage); } else if (message.hasItemeventmessage()) { socketManager.handleItemEvent(client, message.getItemeventmessage() as ItemEventMessage); + } else if (message.hasVariablemessage()) { + socketManager.handleVariableEvent(client, message.getVariablemessage() as VariableMessage); } else if (message.hasWebrtcsignaltoservermessage()) { socketManager.emitVideo( client, diff --git a/pusher/src/Model/PusherRoom.ts b/pusher/src/Model/PusherRoom.ts index a49fce3e..1eae7a9f 100644 --- a/pusher/src/Model/PusherRoom.ts +++ b/pusher/src/Model/PusherRoom.ts @@ -3,10 +3,21 @@ import { PositionDispatcher } from "./PositionDispatcher"; import { ViewportInterface } from "_Model/Websocket/ViewportMessage"; import { extractDataFromPrivateRoomId, extractRoomSlugPublicRoomId, isRoomAnonymous } from "./RoomIdentifier"; import { arrayIntersect } from "../Services/ArrayHelper"; -import { ZoneEventListener } from "_Model/Zone"; +import {GroupDescriptor, UserDescriptor, ZoneEventListener} from "_Model/Zone"; +import {apiClientRepository} from "../Services/ApiClientRepository"; +import { + BatchToPusherMessage, BatchToPusherRoomMessage, EmoteEventMessage, GroupLeftZoneMessage, + GroupUpdateZoneMessage, RoomMessage, SubMessage, + UserJoinedZoneMessage, UserLeftZoneMessage, UserMovedMessage, VariableMessage, + ZoneMessage +} from "../Messages/generated/messages_pb"; +import Debug from "debug"; +import {ClientReadableStream} from "grpc"; + +const debug = Debug("room"); export enum GameRoomPolicyTypes { - ANONYMUS_POLICY = 1, + ANONYMOUS_POLICY = 1, MEMBERS_ONLY_POLICY, USE_TAGS_POLICY, } @@ -20,11 +31,14 @@ export class PusherRoom { public readonly worldSlug: string = ""; public readonly organizationSlug: string = ""; private versionNumber: number = 1; + private backConnection!: ClientReadableStream; + private isClosing: boolean = false; + private listeners: Set = new Set(); - constructor(public readonly roomId: string, private socketListener: ZoneEventListener) { + constructor(public readonly roomId: string, private socketListener: ZoneEventListener, private onBackFailure: (e: Error | null, room: PusherRoom) => void) { this.public = isRoomAnonymous(roomId); this.tags = []; - this.policyType = GameRoomPolicyTypes.ANONYMUS_POLICY; + this.policyType = GameRoomPolicyTypes.ANONYMOUS_POLICY; if (this.public) { this.roomSlug = extractRoomSlugPublicRoomId(this.roomId); @@ -43,8 +57,13 @@ export class PusherRoom { this.positionNotifier.setViewport(socket, viewport); } + public join(socket: ExSocketInterface) { + this.listeners.add(socket); + } + public leave(socket: ExSocketInterface) { this.positionNotifier.removeViewport(socket); + this.listeners.delete(socket); } public canAccess(userTags: string[]): boolean { @@ -63,4 +82,56 @@ export class PusherRoom { return false; } } + + /** + * Creates a connection to the back server to track global messages relative to this room (like variable changes). + */ + public async init(): Promise { + debug("Opening connection to room %s on back server", this.roomId); + const apiClient = await apiClientRepository.getClient(this.roomId); + const roomMessage = new RoomMessage(); + roomMessage.setRoomid(this.roomId); + this.backConnection = apiClient.listenRoom(roomMessage); + this.backConnection.on("data", (batch: BatchToPusherRoomMessage) => { + for (const message of batch.getPayloadList()) { + if (message.hasVariablemessage()) { + const variableMessage = message.getVariablemessage() as VariableMessage; + // We need to dispatch this variable to all the listeners + for (const listener of this.listeners) { + const subMessage = new SubMessage(); + subMessage.setVariablemessage(variableMessage); + listener.emitInBatch(subMessage); + } + } else { + throw new Error("Unexpected message"); + } + } + }); + + this.backConnection.on("error", (e) => { + if (!this.isClosing) { + debug("Error on back connection"); + this.close(); + this.onBackFailure(e, this); + } + }); + this.backConnection.on("close", () => { + if (!this.isClosing) { + debug("Close on back connection"); + this.close(); + this.onBackFailure(null, this); + } + }); + } + + public close(): void { + debug("Closing connection to room %s on back server", this.roomId); + this.isClosing = true; + this.backConnection.cancel(); + + // Let's close all connections linked to that room + for (const listener of this.listeners) { + listener.close(); + } + } } diff --git a/pusher/src/Services/SocketManager.ts b/pusher/src/Services/SocketManager.ts index 8a0d3673..6c78d398 100644 --- a/pusher/src/Services/SocketManager.ts +++ b/pusher/src/Services/SocketManager.ts @@ -29,7 +29,7 @@ import { AdminMessage, BanMessage, RefreshRoomMessage, - EmotePromptMessage, + EmotePromptMessage, VariableMessage, } from "../Messages/generated/messages_pb"; import { ProtobufUtils } from "../Model/Websocket/ProtobufUtils"; import { JITSI_ISS, SECRET_JITSI_KEY } from "../Enum/EnvironmentVariable"; @@ -227,6 +227,9 @@ export class SocketManager implements ZoneEventListener { const pusherToBackMessage = new PusherToBackMessage(); pusherToBackMessage.setJoinroommessage(joinRoomMessage); streamToPusher.write(pusherToBackMessage); + + const pusherRoom = await this.getOrCreateRoom(client.roomId); + pusherRoom.join(client); } catch (e) { console.error('An error occurred on "join_room" event'); console.error(e); @@ -300,6 +303,13 @@ export class SocketManager implements ZoneEventListener { client.backConnection.write(pusherToBackMessage); } + handleVariableEvent(client: ExSocketInterface, variableMessage: VariableMessage) { + const pusherToBackMessage = new PusherToBackMessage(); + pusherToBackMessage.setVariablemessage(variableMessage); + + client.backConnection.write(pusherToBackMessage); + } + async handleReportMessage(client: ExSocketInterface, reportPlayerMessage: ReportPlayerMessage) { try { const reportedSocket = this.sockets.get(reportPlayerMessage.getReporteduserid()); @@ -334,14 +344,6 @@ export class SocketManager implements ZoneEventListener { socket.backConnection.write(pusherToBackMessage); } - private searchClientByIdOrFail(userId: number): ExSocketInterface { - const client: ExSocketInterface | undefined = this.sockets.get(userId); - if (client === undefined) { - throw new Error("Could not find user with id " + userId); - } - return client; - } - leaveRoom(socket: ExSocketInterface) { // leave previous room and world try { @@ -354,6 +356,7 @@ export class SocketManager implements ZoneEventListener { room.leave(socket); if (room.isEmpty()) { + room.close(); this.rooms.delete(socket.roomId); debug("Room %s is empty. Deleting.", socket.roomId); } @@ -384,9 +387,10 @@ export class SocketManager implements ZoneEventListener { if (!world.public) { await this.updateRoomWithAdminData(world); } + await world.init(); this.rooms.set(roomId, world); } - return Promise.resolve(world); + return world; } public async updateRoomWithAdminData(world: PusherRoom): Promise { @@ -410,15 +414,6 @@ export class SocketManager implements ZoneEventListener { return this.rooms; } - searchClientByUuid(uuid: string): ExSocketInterface | null { - for (const socket of this.sockets.values()) { - if (socket.userUuid === uuid) { - return socket; - } - } - return null; - } - public handleQueryJitsiJwtMessage(client: ExSocketInterface, queryJitsiJwtMessage: QueryJitsiJwtMessage) { try { const room = queryJitsiJwtMessage.getJitsiroom();