Starting adding variables server-side

This commit is contained in:
David Négrier 2021-07-06 15:30:49 +02:00
parent cb78ff333b
commit a1f1927b6d
7 changed files with 147 additions and 30 deletions

View File

@ -34,7 +34,8 @@ export class GameRoom {
private readonly connectCallback: ConnectCallback;
private readonly disconnectCallback: DisconnectCallback;
private itemsState: Map<number, unknown> = new Map<number, unknown>();
private itemsState = new Map<number, unknown>();
private variables = new Map<string, string>();
private readonly positionNotifier: PositionNotifier;
public readonly roomId: string;
@ -309,6 +310,10 @@ export class GameRoom {
return this.itemsState;
}
public setVariable(name: string, value: string): void {
this.variables.set(name, value);
}
public addZoneListener(call: ZoneSocket, x: number, y: number): Set<Movable> {
return this.positionNotifier.addZoneListener(call, x, y);
}

View File

@ -16,7 +16,7 @@ import {
ServerToAdminClientMessage,
ServerToClientMessage,
SilentMessage,
UserMovesMessage,
UserMovesMessage, VariableMessage,
WebRtcSignalToServerMessage,
WorldFullWarningToRoomMessage,
ZoneMessage,
@ -72,6 +72,8 @@ const roomManager: IRoomManagerServer = {
socketManager.handleSilentMessage(room, user, message.getSilentmessage() as SilentMessage);
} else if (message.hasItemeventmessage()) {
socketManager.handleItemEvent(room, user, message.getItemeventmessage() as ItemEventMessage);
} else if (message.hasVariablemessage()) {
socketManager.handleVariableEvent(room, user, message.getVariablemessage() as VariableMessage);
} else if (message.hasWebrtcsignaltoservermessage()) {
socketManager.emitVideo(
room,

View File

@ -29,7 +29,7 @@ import {
EmoteEventMessage,
BanUserMessage,
RefreshRoomMessage,
EmotePromptMessage,
EmotePromptMessage, VariableMessage,
} from "../Messages/generated/messages_pb";
import { User, UserSocket } from "../Model/User";
import { ProtobufUtils } from "../Model/Websocket/ProtobufUtils";
@ -184,6 +184,28 @@ export class SocketManager {
}
}
handleVariableEvent(room: GameRoom, user: User, variableMessage: VariableMessage) {
const itemEvent = ProtobufUtils.toItemEvent(itemEventMessage);
try {
// TODO: DISPATCH ON NEW ROOM CHANNEL
const subMessage = new SubMessage();
subMessage.setItemeventmessage(itemEventMessage);
// Let's send the event without using the SocketIO room.
// TODO: move this in the GameRoom class.
for (const user of room.getUsers().values()) {
user.emitInBatch(subMessage);
}
room.setVariable(variableMessage.getName(), variableMessage.getValue());
} catch (e) {
console.error('An error occurred on "item_event"');
console.error(e);
}
}
emitVideo(room: GameRoom, user: User, data: WebRtcSignalToServerMessage): void {
//send only at user
const remoteUser = room.getUsers().get(data.getReceiverid());
@ -425,6 +447,7 @@ export class SocketManager {
// Let's send 2 messages: one to the user joining the group and one to the other user
const webrtcStartMessage1 = new WebRtcStartMessage();
webrtcStartMessage1.setUserid(otherUser.id);
webrtcStartMessage1.setUseruuid(otherUser.uuid);
webrtcStartMessage1.setName(otherUser.name);
webrtcStartMessage1.setInitiator(true);
if (TURN_STATIC_AUTH_SECRET !== "") {
@ -443,6 +466,7 @@ export class SocketManager {
const webrtcStartMessage2 = new WebRtcStartMessage();
webrtcStartMessage2.setUserid(user.id);
webrtcStartMessage2.setUseruuid(user.uuid);
webrtcStartMessage2.setName(user.name);
webrtcStartMessage2.setInitiator(false);
if (TURN_STATIC_AUTH_SECRET !== "") {

View File

@ -325,6 +325,10 @@ message ZoneMessage {
int32 y = 3;
}
message RoomMessage {
string roomId = 1;
}
message PusherToBackMessage {
oneof message {
JoinRoomMessage joinRoomMessage = 1;
@ -360,10 +364,20 @@ message SubToPusherMessage {
SendUserMessage sendUserMessage = 7;
BanUserMessage banUserMessage = 8;
EmoteEventMessage emoteEventMessage = 9;
VariableMessage variableMessage = 10;
}
}
message BatchToPusherRoomMessage {
repeated SubToPusherRoomMessage payload = 2;
}
message SubToPusherRoomMessage {
oneof message {
VariableMessage variableMessage = 1;
}
}
/*message BatchToAdminPusherMessage {
repeated SubToAdminPusherMessage payload = 2;
}*/
@ -433,9 +447,13 @@ message EmptyMessage {
}
/**
* Service handled by the "back". Pusher servers connect to this service.
*/
service RoomManager {
rpc joinRoom(stream PusherToBackMessage) returns (stream ServerToClientMessage);
rpc listenZone(ZoneMessage) returns (stream BatchToPusherMessage);
rpc joinRoom(stream PusherToBackMessage) returns (stream ServerToClientMessage); // Holds a connection between one given client and the back
rpc listenZone(ZoneMessage) returns (stream BatchToPusherMessage); // Connection used to send to a pusher messages related to a given zone of a given room
rpc listenRoom(RoomMessage) returns (stream BatchToPusherRoomMessage); // Connection used to send to a pusher messages related to a given room
rpc adminRoom(stream AdminPusherToBackMessage) returns (stream ServerToAdminClientMessage);
rpc sendAdminMessage(AdminMessage) returns (EmptyMessage);
rpc sendGlobalAdminMessage(AdminGlobalMessage) returns (EmptyMessage);

View File

@ -16,7 +16,7 @@ import {
SendUserMessage,
ServerToClientMessage,
CompanionMessage,
EmotePromptMessage,
EmotePromptMessage, VariableMessage,
} from "../Messages/generated/messages_pb";
import { UserMovesMessage } from "../Messages/generated/messages_pb";
import { TemplatedApp } from "uWebSockets.js";
@ -358,6 +358,8 @@ export class IoSocketController {
socketManager.handleSilentMessage(client, message.getSilentmessage() as SilentMessage);
} else if (message.hasItemeventmessage()) {
socketManager.handleItemEvent(client, message.getItemeventmessage() as ItemEventMessage);
} else if (message.hasVariablemessage()) {
socketManager.handleVariableEvent(client, message.getVariablemessage() as VariableMessage);
} else if (message.hasWebrtcsignaltoservermessage()) {
socketManager.emitVideo(
client,

View File

@ -3,10 +3,21 @@ import { PositionDispatcher } from "./PositionDispatcher";
import { ViewportInterface } from "_Model/Websocket/ViewportMessage";
import { extractDataFromPrivateRoomId, extractRoomSlugPublicRoomId, isRoomAnonymous } from "./RoomIdentifier";
import { arrayIntersect } from "../Services/ArrayHelper";
import { ZoneEventListener } from "_Model/Zone";
import {GroupDescriptor, UserDescriptor, ZoneEventListener} from "_Model/Zone";
import {apiClientRepository} from "../Services/ApiClientRepository";
import {
BatchToPusherMessage, BatchToPusherRoomMessage, EmoteEventMessage, GroupLeftZoneMessage,
GroupUpdateZoneMessage, RoomMessage, SubMessage,
UserJoinedZoneMessage, UserLeftZoneMessage, UserMovedMessage, VariableMessage,
ZoneMessage
} from "../Messages/generated/messages_pb";
import Debug from "debug";
import {ClientReadableStream} from "grpc";
const debug = Debug("room");
export enum GameRoomPolicyTypes {
ANONYMUS_POLICY = 1,
ANONYMOUS_POLICY = 1,
MEMBERS_ONLY_POLICY,
USE_TAGS_POLICY,
}
@ -20,11 +31,14 @@ export class PusherRoom {
public readonly worldSlug: string = "";
public readonly organizationSlug: string = "";
private versionNumber: number = 1;
private backConnection!: ClientReadableStream<BatchToPusherRoomMessage>;
private isClosing: boolean = false;
private listeners: Set<ExSocketInterface> = new Set<ExSocketInterface>();
constructor(public readonly roomId: string, private socketListener: ZoneEventListener) {
constructor(public readonly roomId: string, private socketListener: ZoneEventListener, private onBackFailure: (e: Error | null, room: PusherRoom) => void) {
this.public = isRoomAnonymous(roomId);
this.tags = [];
this.policyType = GameRoomPolicyTypes.ANONYMUS_POLICY;
this.policyType = GameRoomPolicyTypes.ANONYMOUS_POLICY;
if (this.public) {
this.roomSlug = extractRoomSlugPublicRoomId(this.roomId);
@ -43,8 +57,13 @@ export class PusherRoom {
this.positionNotifier.setViewport(socket, viewport);
}
public join(socket: ExSocketInterface) {
this.listeners.add(socket);
}
public leave(socket: ExSocketInterface) {
this.positionNotifier.removeViewport(socket);
this.listeners.delete(socket);
}
public canAccess(userTags: string[]): boolean {
@ -63,4 +82,56 @@ export class PusherRoom {
return false;
}
}
/**
* Creates a connection to the back server to track global messages relative to this room (like variable changes).
*/
public async init(): Promise<void> {
debug("Opening connection to room %s on back server", this.roomId);
const apiClient = await apiClientRepository.getClient(this.roomId);
const roomMessage = new RoomMessage();
roomMessage.setRoomid(this.roomId);
this.backConnection = apiClient.listenRoom(roomMessage);
this.backConnection.on("data", (batch: BatchToPusherRoomMessage) => {
for (const message of batch.getPayloadList()) {
if (message.hasVariablemessage()) {
const variableMessage = message.getVariablemessage() as VariableMessage;
// We need to dispatch this variable to all the listeners
for (const listener of this.listeners) {
const subMessage = new SubMessage();
subMessage.setVariablemessage(variableMessage);
listener.emitInBatch(subMessage);
}
} else {
throw new Error("Unexpected message");
}
}
});
this.backConnection.on("error", (e) => {
if (!this.isClosing) {
debug("Error on back connection");
this.close();
this.onBackFailure(e, this);
}
});
this.backConnection.on("close", () => {
if (!this.isClosing) {
debug("Close on back connection");
this.close();
this.onBackFailure(null, this);
}
});
}
public close(): void {
debug("Closing connection to room %s on back server", this.roomId);
this.isClosing = true;
this.backConnection.cancel();
// Let's close all connections linked to that room
for (const listener of this.listeners) {
listener.close();
}
}
}

View File

@ -29,7 +29,7 @@ import {
AdminMessage,
BanMessage,
RefreshRoomMessage,
EmotePromptMessage,
EmotePromptMessage, VariableMessage,
} from "../Messages/generated/messages_pb";
import { ProtobufUtils } from "../Model/Websocket/ProtobufUtils";
import { JITSI_ISS, SECRET_JITSI_KEY } from "../Enum/EnvironmentVariable";
@ -227,6 +227,9 @@ export class SocketManager implements ZoneEventListener {
const pusherToBackMessage = new PusherToBackMessage();
pusherToBackMessage.setJoinroommessage(joinRoomMessage);
streamToPusher.write(pusherToBackMessage);
const pusherRoom = await this.getOrCreateRoom(client.roomId);
pusherRoom.join(client);
} catch (e) {
console.error('An error occurred on "join_room" event');
console.error(e);
@ -300,6 +303,13 @@ export class SocketManager implements ZoneEventListener {
client.backConnection.write(pusherToBackMessage);
}
handleVariableEvent(client: ExSocketInterface, variableMessage: VariableMessage) {
const pusherToBackMessage = new PusherToBackMessage();
pusherToBackMessage.setVariablemessage(variableMessage);
client.backConnection.write(pusherToBackMessage);
}
async handleReportMessage(client: ExSocketInterface, reportPlayerMessage: ReportPlayerMessage) {
try {
const reportedSocket = this.sockets.get(reportPlayerMessage.getReporteduserid());
@ -334,14 +344,6 @@ export class SocketManager implements ZoneEventListener {
socket.backConnection.write(pusherToBackMessage);
}
private searchClientByIdOrFail(userId: number): ExSocketInterface {
const client: ExSocketInterface | undefined = this.sockets.get(userId);
if (client === undefined) {
throw new Error("Could not find user with id " + userId);
}
return client;
}
leaveRoom(socket: ExSocketInterface) {
// leave previous room and world
try {
@ -354,6 +356,7 @@ export class SocketManager implements ZoneEventListener {
room.leave(socket);
if (room.isEmpty()) {
room.close();
this.rooms.delete(socket.roomId);
debug("Room %s is empty. Deleting.", socket.roomId);
}
@ -384,9 +387,10 @@ export class SocketManager implements ZoneEventListener {
if (!world.public) {
await this.updateRoomWithAdminData(world);
}
await world.init();
this.rooms.set(roomId, world);
}
return Promise.resolve(world);
return world;
}
public async updateRoomWithAdminData(world: PusherRoom): Promise<void> {
@ -410,15 +414,6 @@ export class SocketManager implements ZoneEventListener {
return this.rooms;
}
searchClientByUuid(uuid: string): ExSocketInterface | null {
for (const socket of this.sockets.values()) {
if (socket.userUuid === uuid) {
return socket;
}
}
return null;
}
public handleQueryJitsiJwtMessage(client: ExSocketInterface, queryJitsiJwtMessage: QueryJitsiJwtMessage) {
try {
const room = queryJitsiJwtMessage.getJitsiroom();