forked from googollee/go-socket.io
-
Notifications
You must be signed in to change notification settings - Fork 1
/
connection.go
145 lines (115 loc) · 2.63 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package socketio
import (
"io"
"net"
"net/http"
"net/url"
"reflect"
"sync"
"github.com/googollee/go-socket.io/engineio"
"github.com/googollee/go-socket.io/parser"
)
// Conn is a connection in go-socket.io
type Conn interface {
io.Closer
Namespace
// ID returns session id
ID() string
URL() url.URL
LocalAddr() net.Addr
RemoteAddr() net.Addr
RemoteHeader() http.Header
}
type conn struct {
engineio.Conn
id uint64
handlers *namespaceHandlers
namespaces *namespaces
encoder *parser.Encoder
decoder *parser.Decoder
writeChan chan parser.Payload
errorChan chan error
quitChan chan struct{}
closeOnce sync.Once
}
func newConn(engineConn engineio.Conn, handlers *namespaceHandlers) *conn {
return &conn{
Conn: engineConn,
encoder: parser.NewEncoder(engineConn),
decoder: parser.NewDecoder(engineConn),
errorChan: make(chan error),
writeChan: make(chan parser.Payload),
quitChan: make(chan struct{}),
handlers: handlers,
namespaces: newNamespaces(),
}
}
func (c *conn) Close() error {
var err error
c.closeOnce.Do(func() {
// for each namespace, leave all rooms, and call the disconnect handler.
c.namespaces.Range(func(ns string, nc *namespaceConn) {
if nh, _ := c.handlers.Get(ns); nh != nil && nh.onDisconnect != nil {
nh.onDisconnect(nc, clientDisconnectMsg)
}
nc.LeaveAll()
})
err = c.Conn.Close()
close(c.quitChan)
})
return err
}
func (c *conn) connect() error {
rootHandler, ok := c.handlers.Get(rootNamespace)
if !ok {
return errUnavailableRootHandler
}
root := newNamespaceConn(c, aliasRootNamespace, rootHandler.broadcast)
c.namespaces.Set(rootNamespace, root)
root.Join(root.Conn.ID())
c.namespaces.Range(func(ns string, nc *namespaceConn) {
nc.SetContext(c.Conn.Context())
})
header := parser.Header{
Type: parser.Connect,
}
if err := c.encoder.Encode(header); err != nil {
return err
}
handler, ok := c.handlers.Get(header.Namespace)
if ok {
_, err := handler.dispatch(root, header)
return err
}
return nil
}
func (c *conn) nextID() uint64 {
c.id++
return c.id
}
func (c *conn) write(header parser.Header, args ...reflect.Value) {
data := make([]interface{}, len(args))
for i := range data {
data[i] = args[i].Interface()
}
pkg := parser.Payload{
Header: header,
Data: data,
}
select {
case c.writeChan <- pkg:
case <-c.quitChan:
return
}
}
func (c *conn) onError(namespace string, err error) {
select {
case c.errorChan <- newErrorMessage(namespace, err):
case <-c.quitChan:
return
}
}
func (c *conn) namespace(nsp string) *namespaceHandler {
handler, _ := c.handlers.Get(nsp)
return handler
}