diff --git a/src/broadcast/broadcast_manager.ts b/src/broadcast/broadcast_manager.ts index cb36649c7..54d1220ee 100644 --- a/src/broadcast/broadcast_manager.ts +++ b/src/broadcast/broadcast_manager.ts @@ -15,6 +15,19 @@ const SLIPPI_WS_SERVER = process.env.SLIPPI_WS_SERVER; // support disconnects of 30 seconds at most const BACKUP_MAX_LENGTH = 1800; +const CONNECTING_SUB_STEP_INITIAL_TIMEOUT = 2000; +enum ConnectingSubStep { + NONE = "NONE", + SOCKET = "SOCKET", + GET = "GET", + START = "START", +} +type ConnectingSubState = { + step: ConnectingSubStep; + broadcastId: string | null; + timeout: number; +}; + /** * Responsible for retrieving Dolphin game data over enet and sending the data * to the Slippi server over websockets. @@ -27,16 +40,25 @@ export class BroadcastManager extends EventEmitter { private nextGameCursor: number | null; private slippiStatus: ConnectionStatus; + private wsClient: WebSocketClient | null; private wsConnection: connection | null; private dolphinConnection: DolphinConnection; + private connectingSubState: ConnectingSubState; + constructor() { super(); this.broadcastId = null; this.isBroadcastReady = false; + this.wsClient = null; this.wsConnection = null; this.incomingEvents = []; this.slippiStatus = ConnectionStatus.DISCONNECTED; + this.connectingSubState = { + step: ConnectingSubStep.NONE, + broadcastId: null, + timeout: CONNECTING_SUB_STEP_INITIAL_TIMEOUT, + }; // We need to store events as we process them in the event that we get a disconnect and // we need to re-send some events to the server @@ -106,6 +128,13 @@ export class BroadcastManager extends EventEmitter { return; } + // If we don't have a WS connection but we do have a WS client we're somewhere mid-connecting. Just start over. + if (this.wsClient) { + this.wsClient.removeAllListeners(); + this.wsClient.abort(); + this.wsClient = null; + } + // Indicate we're connecting to the Slippi server this._setSlippiStatus(ConnectionStatus.CONNECTING); @@ -115,9 +144,9 @@ export class BroadcastManager extends EventEmitter { authorization: `Bearer ${config.authToken}`, }; - const socket = new WebSocketClient({ disableNagleAlgorithm: true }); + this.wsClient = new WebSocketClient({ disableNagleAlgorithm: true }); - socket.on("connectFailed", (err) => { + this.wsClient.on("connectFailed", (err) => { this.emit(BroadcastEvent.LOG, `WS failed to connect`); const label = "x-websocket-reject-reason: "; @@ -129,10 +158,24 @@ export class BroadcastManager extends EventEmitter { } this.emit(BroadcastEvent.ERROR, message); - this.stop(); + + const currentTimeout = this.connectingSubState.timeout; + setTimeout(() => { + if (this.wsClient) { + this.emit( + BroadcastEvent.LOG, + `Retrying connecting sub step: ${this.connectingSubState.step} after ${currentTimeout}ms`, + ); + this.wsClient.connect(SLIPPI_WS_SERVER, "broadcast-protocol", undefined, headers); + } + }, currentTimeout); + this.connectingSubState.timeout *= 2; }); - socket.on("connect", (connection: connection) => { + this.wsClient.on("connect", (connection: connection) => { + this.connectingSubState.step = ConnectingSubStep.GET; + this.connectingSubState.timeout = CONNECTING_SUB_STEP_INITIAL_TIMEOUT; + this.emit(BroadcastEvent.LOG, "WS connection successful"); this.wsConnection = connection; @@ -171,6 +214,11 @@ export class BroadcastManager extends EventEmitter { this.isBroadcastReady = true; this.broadcastId = broadcastId; + this.connectingSubState = { + step: ConnectingSubStep.NONE, + broadcastId: null, + timeout: CONNECTING_SUB_STEP_INITIAL_TIMEOUT, + }; this._setSlippiStatus(ConnectionStatus.CONNECTED); // Process any events that may have been missed when we disconnected @@ -231,7 +279,7 @@ export class BroadcastManager extends EventEmitter { case "start-broadcast-resp": { if (message.recoveryGameCursor !== undefined) { const firstIncoming = this.incomingEvents[0]; - let firstCursor: number | null | undefined; + let firstCursor: number | undefined; if (firstIncoming) { firstCursor = firstIncoming.cursor; } @@ -247,7 +295,7 @@ export class BroadcastManager extends EventEmitter { const isNeededByServer = event.cursor > message.recoveryGameCursor; // Make sure we aren't duplicating anything that is already in the incoming events array - const isNotIncoming = firstCursor == null || event.cursor < firstCursor; + const isNotIncoming = firstCursor === undefined || event.cursor < firstCursor; return isNeededByServer && isNotIncoming; } @@ -256,13 +304,11 @@ export class BroadcastManager extends EventEmitter { this.incomingEvents = [...backedEventsToUse, ...this.incomingEvents]; + let newFirstCursor: number | undefined; const newFirstEvent = this.incomingEvents[0]; - if (!newFirstEvent) { - this.emit(BroadcastEvent.LOG, "Missing new first event"); - return; + if (newFirstEvent) { + newFirstCursor = newFirstEvent.cursor; } - const newFirstCursor = newFirstEvent.cursor; - const firstBackupCursor = (this.backupEvents[0] || {}).cursor; const lastBackupCursor = (last(this.backupEvents) || {}).cursor; @@ -277,6 +323,9 @@ export class BroadcastManager extends EventEmitter { break; } case "get-broadcasts-resp": { + this.connectingSubState.step = ConnectingSubStep.START; + this.connectingSubState.timeout = CONNECTING_SUB_STEP_INITIAL_TIMEOUT; + const broadcasts = message.broadcasts || []; // Grab broadcastId we were currently using if the broadcast still exists, would happen @@ -286,6 +335,8 @@ export class BroadcastManager extends EventEmitter { const prevBroadcast = broadcastsById[this.broadcastId]; if (prevBroadcast) { + this.connectingSubState.broadcastId = prevBroadcast.id; + // TODO: Figure out if this config.name guaranteed to be the correct name? startBroadcast(prevBroadcast.id, config.name).catch(console.warn); return; @@ -305,10 +356,33 @@ export class BroadcastManager extends EventEmitter { }); getBroadcasts().catch(console.warn); + const postSocketConnectingSubStepRetry = () => { + if (this.connectingSubState.step === ConnectingSubStep.NONE) { + return; + } + + this.emit( + BroadcastEvent.LOG, + `Retrying connecting sub step: ${this.connectingSubState.step} after ${this.connectingSubState.timeout}ms`, + ); + this.connectingSubState.timeout *= 2; + if (this.connectingSubState.step === ConnectingSubStep.GET) { + getBroadcasts().catch(console.warn); + } else { + startBroadcast(this.connectingSubState.broadcastId, config.name).catch(console.warn); + } + setTimeout(() => { + postSocketConnectingSubStepRetry(); + }, this.connectingSubState.timeout); + }; + setTimeout(() => { + postSocketConnectingSubStepRetry(); + }, CONNECTING_SUB_STEP_INITIAL_TIMEOUT); }); this.emit(BroadcastEvent.LOG, "Connecting to WS service"); - socket.connect(SLIPPI_WS_SERVER, "broadcast-protocol", undefined, headers); + this.wsClient.connect(SLIPPI_WS_SERVER, "broadcast-protocol", undefined, headers); + this.connectingSubState.step = ConnectingSubStep.SOCKET; } public stop() { @@ -326,20 +400,28 @@ export class BroadcastManager extends EventEmitter { return; } - if (this.wsConnection && this.broadcastId) { + if (this.wsConnection) { this.emit(BroadcastEvent.LOG, "Disconnecting ws connection..."); - this.wsConnection.send( - JSON.stringify({ - type: "stop-broadcast", - broadcastId: this.broadcastId, - }), - ); + if (this.broadcastId) { + this.wsConnection.send( + JSON.stringify({ + type: "stop-broadcast", + broadcastId: this.broadcastId, + }), + ); + } this.wsConnection.close(); this.wsConnection = null; } + if (this.wsClient) { + this.wsClient.removeAllListeners(); + this.wsClient.abort(); + this.wsClient = null; + } + // Clear incoming events this.incomingEvents = []; }