Skip to content

Commit

Permalink
Better handle closing of clients (#51)
Browse files Browse the repository at this point in the history
There was a 'leave' packet that was interpreted differently by the
client and server and we corrected this.
Also don't accept any messages anymore after a close message is
received.
And when a client closes we don't add them to the timeout manager when
the connection is broken.

(leaving of lobbies, as the leave packet was interpreted by the server
was never a feature, soon™)
  • Loading branch information
koenbollen authored Aug 25, 2023
1 parent 0872187 commit 6ed745b
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 33 deletions.
15 changes: 11 additions & 4 deletions internal/signaling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre
logger.Info("peer websocket closed", zap.String("peer", peer.ID))
conn.Close(websocket.StatusInternalError, "unexpceted closure")

// At this point ctx has already been cancelled, so we create a new one to use for the disconnect.
nctx, cancel := context.WithTimeout(logging.WithLogger(context.Background(), logger), time.Second*10)
defer cancel()
manager.Disconnected(nctx, peer)
if !peer.closedPacketReceived {
// At this point ctx has already been cancelled, so we create a new one to use for the disconnect.
nctx, cancel := context.WithTimeout(logging.WithLogger(context.Background(), logger), time.Second*10)
defer cancel()
manager.Disconnected(nctx, peer)
}
}()

for ctx.Err() == nil {
Expand All @@ -75,6 +77,11 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre
util.ErrorAndDisconnect(ctx, conn, err)
}

if peer.closedPacketReceived {
logger.Warn("received packet after close", zap.String("peer", peer.ID), zap.String("type", typeOnly.Type))
continue
}

switch typeOnly.Type {
case "credentials":
credentials, err := cloudflare.GetCredentials(ctx)
Expand Down
76 changes: 52 additions & 24 deletions internal/signaling/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Peer struct {
store stores.Store
conn *websocket.Conn

closedPacketReceived bool

retrievedIDCallback func(context.Context, *Peer) (bool, error)

ID string
Expand Down Expand Up @@ -121,6 +123,16 @@ func (p *Peer) HandlePacket(ctx context.Context, typ string, raw []byte) error {
return fmt.Errorf("unable to handle packet: %w", err)
}

case "close":
packet := ClosePacket{}
if err := json.Unmarshal(raw, &packet); err != nil {
return fmt.Errorf("unable to unmarshal json: %w", err)
}
err = p.HandleClosePacket(ctx, packet)
if err != nil {
return fmt.Errorf("unable to handle packet: %w", err)
}

case "list":
packet := ListPacket{}
if err := json.Unmarshal(raw, &packet); err != nil {
Expand Down Expand Up @@ -151,30 +163,7 @@ func (p *Peer) HandlePacket(ctx context.Context, typ string, raw []byte) error {
return fmt.Errorf("unable to handle packet: %w", err)
}

case "leave":
go metrics.Record(ctx, "lobby", "leave", p.Game, p.ID, p.Lobby)
logger.Info("leaving lobby", zap.String("lobby", p.Lobby))

others, err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID)
if err != nil {
return fmt.Errorf("unable to leave lobby: %w", err)
}
packet := DisconnectPacket{
Type: "disconnect",
ID: p.ID,
}
data, err := json.Marshal(packet)
if err == nil {
for _, id := range others {
if id != p.ID {
err := p.store.Publish(ctx, p.Game+p.Lobby+id, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
}
}
}
p.Lobby = ""
// case "leave":

case "connected": // TODO: Do we want to keep track of connections between peers?
case "disconnected": // TODO: Do we want to keep track of connections between peers?
Expand Down Expand Up @@ -269,6 +258,45 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error
})
}

func (p *Peer) HandleClosePacket(ctx context.Context, packet ClosePacket) error {
logger := logging.GetLogger(ctx)
go metrics.Record(ctx, "client", "close", p.Game, p.ID, p.Lobby)

p.closedPacketReceived = true

logger.Info("client closed",
zap.String("game", p.Game),
zap.String("peer", p.ID),
zap.String("lobby", p.Lobby),
zap.String("reason", packet.Reason),
)

if p.Lobby != "" {
others, err := p.store.LeaveLobby(ctx, p.Game, p.Lobby, p.ID)
if err != nil {
return fmt.Errorf("unable to leave lobby: %w", err)
}
packet := DisconnectPacket{
Type: "disconnect",
ID: p.ID,
}
data, err := json.Marshal(packet)
if err == nil {
for _, id := range others {
if id != p.ID {
err := p.store.Publish(ctx, p.Game+p.Lobby+id, data)
if err != nil {
logger.Error("failed to publish disconnect packet", zap.Error(err))
}
}
}
}
p.Lobby = ""
}

return nil
}

func (p *Peer) HandleListPacket(ctx context.Context, packet ListPacket) error {
logger := logging.GetLogger(ctx)
if p.ID == "" {
Expand Down
2 changes: 1 addition & 1 deletion internal/signaling/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type DisconnectPacket struct {
Reason string `json:"reason"`
}

type LeavePacket struct {
type ClosePacket struct {
Type string `json:"type"`

ID string `json:"id"`
Expand Down
2 changes: 1 addition & 1 deletion lib/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export default class Network extends EventEmitter<NetworkListeners> {

if (this.id !== '') {
this.signaling.send({
type: 'leave',
type: 'close',
id: this.id,
reason: reason ?? 'normal closure'
})
Expand Down
6 changes: 3 additions & 3 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ interface Base {

export type SignalingPacketTypes =
| CandidatePacket
| ClosePacket
| ConnectedPacket
| ConnectPacket
| CreatePacket
Expand All @@ -39,7 +40,6 @@ export type SignalingPacketTypes =
| HelloPacket
| JoinedPacket
| JoinPacket
| LeavePacket
| ListPacket
| LobbiesPacket
| WelcomePacket
Expand Down Expand Up @@ -91,8 +91,8 @@ export interface JoinedPacket extends Base {
id: string
}

export interface LeavePacket extends Base {
type: 'leave'
export interface ClosePacket extends Base {
type: 'close'
id: string
reason: string
}
Expand Down

0 comments on commit 6ed745b

Please sign in to comment.