Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: attempt to fix dangling broadcasts #428

Merged
merged 6 commits into from
Mar 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 101 additions & 19 deletions src/broadcast/broadcast_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ const SLIPPI_WS_SERVER = process.env.SLIPPI_WS_SERVER;
// support disconnects of 30 seconds at most
const BACKUP_MAX_LENGTH = 1800;

const CONNECTING_SUB_STEP_INITIAL_TIMEOUT = 2000;
enum ConnectingSubStep {
NONE = "NONE",
SOCKET = "SOCKET",
GET = "GET",
START = "START",
}
type ConnectingSubState = {
step: ConnectingSubStep;
broadcastId: string | null;
timeout: number;
};

/**
* Responsible for retrieving Dolphin game data over enet and sending the data
* to the Slippi server over websockets.
Expand All @@ -27,16 +40,25 @@ export class BroadcastManager extends EventEmitter {
private nextGameCursor: number | null;
private slippiStatus: ConnectionStatus;

private wsClient: WebSocketClient | null;
private wsConnection: connection | null;
private dolphinConnection: DolphinConnection;

private connectingSubState: ConnectingSubState;

constructor() {
super();
this.broadcastId = null;
this.isBroadcastReady = false;
this.wsClient = null;
this.wsConnection = null;
this.incomingEvents = [];
this.slippiStatus = ConnectionStatus.DISCONNECTED;
this.connectingSubState = {
step: ConnectingSubStep.NONE,
broadcastId: null,
timeout: CONNECTING_SUB_STEP_INITIAL_TIMEOUT,
};

// We need to store events as we process them in the event that we get a disconnect and
// we need to re-send some events to the server
Expand Down Expand Up @@ -106,6 +128,13 @@ export class BroadcastManager extends EventEmitter {
return;
}

// If we don't have a WS connection but we do have a WS client we're somewhere mid-connecting. Just start over.
if (this.wsClient) {
this.wsClient.removeAllListeners();
this.wsClient.abort();
this.wsClient = null;
}

// Indicate we're connecting to the Slippi server
this._setSlippiStatus(ConnectionStatus.CONNECTING);

Expand All @@ -115,9 +144,9 @@ export class BroadcastManager extends EventEmitter {
authorization: `Bearer ${config.authToken}`,
};

const socket = new WebSocketClient({ disableNagleAlgorithm: true });
this.wsClient = new WebSocketClient({ disableNagleAlgorithm: true });

socket.on("connectFailed", (err) => {
this.wsClient.on("connectFailed", (err) => {
this.emit(BroadcastEvent.LOG, `WS failed to connect`);

const label = "x-websocket-reject-reason: ";
Expand All @@ -129,10 +158,24 @@ export class BroadcastManager extends EventEmitter {
}

this.emit(BroadcastEvent.ERROR, message);
this.stop();

const currentTimeout = this.connectingSubState.timeout;
setTimeout(() => {
if (this.wsClient) {
this.emit(
BroadcastEvent.LOG,
`Retrying connecting sub step: ${this.connectingSubState.step} after ${currentTimeout}ms`,
);
this.wsClient.connect(SLIPPI_WS_SERVER, "broadcast-protocol", undefined, headers);
}
}, currentTimeout);
this.connectingSubState.timeout *= 2;
});

socket.on("connect", (connection: connection) => {
this.wsClient.on("connect", (connection: connection) => {
this.connectingSubState.step = ConnectingSubStep.GET;
this.connectingSubState.timeout = CONNECTING_SUB_STEP_INITIAL_TIMEOUT;

this.emit(BroadcastEvent.LOG, "WS connection successful");
this.wsConnection = connection;

Expand Down Expand Up @@ -171,6 +214,11 @@ export class BroadcastManager extends EventEmitter {
this.isBroadcastReady = true;

this.broadcastId = broadcastId;
this.connectingSubState = {
step: ConnectingSubStep.NONE,
broadcastId: null,
timeout: CONNECTING_SUB_STEP_INITIAL_TIMEOUT,
};
this._setSlippiStatus(ConnectionStatus.CONNECTED);

// Process any events that may have been missed when we disconnected
Expand Down Expand Up @@ -231,7 +279,7 @@ export class BroadcastManager extends EventEmitter {
case "start-broadcast-resp": {
if (message.recoveryGameCursor !== undefined) {
const firstIncoming = this.incomingEvents[0];
let firstCursor: number | null | undefined;
let firstCursor: number | undefined;
if (firstIncoming) {
firstCursor = firstIncoming.cursor;
}
Expand All @@ -247,7 +295,7 @@ export class BroadcastManager extends EventEmitter {
const isNeededByServer = event.cursor > message.recoveryGameCursor;

// Make sure we aren't duplicating anything that is already in the incoming events array
const isNotIncoming = firstCursor == null || event.cursor < firstCursor;
const isNotIncoming = firstCursor === undefined || event.cursor < firstCursor;

return isNeededByServer && isNotIncoming;
}
Expand All @@ -256,13 +304,11 @@ export class BroadcastManager extends EventEmitter {

this.incomingEvents = [...backedEventsToUse, ...this.incomingEvents];

let newFirstCursor: number | undefined;
const newFirstEvent = this.incomingEvents[0];
if (!newFirstEvent) {
this.emit(BroadcastEvent.LOG, "Missing new first event");
return;
if (newFirstEvent) {
newFirstCursor = newFirstEvent.cursor;
}
const newFirstCursor = newFirstEvent.cursor;

const firstBackupCursor = (this.backupEvents[0] || {}).cursor;
const lastBackupCursor = (last(this.backupEvents) || {}).cursor;

Expand All @@ -277,6 +323,9 @@ export class BroadcastManager extends EventEmitter {
break;
}
case "get-broadcasts-resp": {
this.connectingSubState.step = ConnectingSubStep.START;
this.connectingSubState.timeout = CONNECTING_SUB_STEP_INITIAL_TIMEOUT;

const broadcasts = message.broadcasts || [];

// Grab broadcastId we were currently using if the broadcast still exists, would happen
Expand All @@ -286,6 +335,8 @@ export class BroadcastManager extends EventEmitter {
const prevBroadcast = broadcastsById[this.broadcastId];

if (prevBroadcast) {
this.connectingSubState.broadcastId = prevBroadcast.id;

// TODO: Figure out if this config.name guaranteed to be the correct name?
startBroadcast(prevBroadcast.id, config.name).catch(console.warn);
return;
Expand All @@ -305,10 +356,33 @@ export class BroadcastManager extends EventEmitter {
});

getBroadcasts().catch(console.warn);
const postSocketConnectingSubStepRetry = () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is giving me pause atm. could you add some comments and explain why this needs to run constantly with setTimeout and instead of only if getBroadcasts fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getBroadcasts and startBroadcast only fail if something goes wrong client-side, that is, if we fail to send the message. We expect the server to respond to both of these messages but because WebSockets simply deals with 'messages' or 'events' and doesn't use a request/response paradigm, there no notion of the server failing to respond to a message. Setting a simple exponential backoff retry covers all possible cases:

  1. failure to send
  2. server sends an unexpected/invalid/error response
  3. server sends nothing

if (this.connectingSubState.step === ConnectingSubStep.NONE) {
return;
}

this.emit(
BroadcastEvent.LOG,
`Retrying connecting sub step: ${this.connectingSubState.step} after ${this.connectingSubState.timeout}ms`,
);
this.connectingSubState.timeout *= 2;
if (this.connectingSubState.step === ConnectingSubStep.GET) {
getBroadcasts().catch(console.warn);
} else {
startBroadcast(this.connectingSubState.broadcastId, config.name).catch(console.warn);
}
setTimeout(() => {
postSocketConnectingSubStepRetry();
}, this.connectingSubState.timeout);
};
setTimeout(() => {
postSocketConnectingSubStepRetry();
}, CONNECTING_SUB_STEP_INITIAL_TIMEOUT);
});

this.emit(BroadcastEvent.LOG, "Connecting to WS service");
socket.connect(SLIPPI_WS_SERVER, "broadcast-protocol", undefined, headers);
this.wsClient.connect(SLIPPI_WS_SERVER, "broadcast-protocol", undefined, headers);
this.connectingSubState.step = ConnectingSubStep.SOCKET;
}

public stop() {
Expand All @@ -326,20 +400,28 @@ export class BroadcastManager extends EventEmitter {
return;
}

if (this.wsConnection && this.broadcastId) {
if (this.wsConnection) {
this.emit(BroadcastEvent.LOG, "Disconnecting ws connection...");

this.wsConnection.send(
JSON.stringify({
type: "stop-broadcast",
broadcastId: this.broadcastId,
}),
);
if (this.broadcastId) {
this.wsConnection.send(
JSON.stringify({
type: "stop-broadcast",
broadcastId: this.broadcastId,
}),
);
}

this.wsConnection.close();
this.wsConnection = null;
}

if (this.wsClient) {
this.wsClient.removeAllListeners();
this.wsClient.abort();
this.wsClient = null;
}

// Clear incoming events
this.incomingEvents = [];
}
Expand Down
Loading