-
Notifications
You must be signed in to change notification settings - Fork 5
/
read-writer.go
124 lines (106 loc) · 2.53 KB
/
read-writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package rpcstream
import (
"bytes"
"errors"
"io"
"github.com/aperturerobotics/starpc/srpc"
)
// RpcStreamReadWriter reads and writes a buffered RpcStream.
type RpcStreamReadWriter struct {
// stream is the RpcStream
stream RpcStream
// buf is the incoming data buffer
buf bytes.Buffer
}
// NewRpcStreamReadWriter constructs a new read/writer.
func NewRpcStreamReadWriter(stream RpcStream) *RpcStreamReadWriter {
return &RpcStreamReadWriter{stream: stream}
}
// ReadPump executes the read pump in a goroutine.
//
// calls the handler when closed or returning an error
func ReadPump(strm RpcStream, cb srpc.PacketDataHandler, closed srpc.CloseHandler) {
err := ReadToHandler(strm, cb)
// signal that the stream is now closed.
if closed != nil {
closed(err)
}
}
// ReadToHandler reads data to the given handler.
// Does not handle closing the stream, use ReadPump instead.
func ReadToHandler(strm RpcStream, cb srpc.PacketDataHandler) error {
for {
// read packet
pkt, err := strm.Recv()
if err != nil {
return err
}
data := pkt.GetData()
if len(data) == 0 {
continue
}
// call handler
if err := cb(data); err != nil {
return err
}
}
}
// Write writes a packet to the writer.
func (r *RpcStreamReadWriter) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
err = r.stream.Send(&RpcStreamPacket{
Body: &RpcStreamPacket_Data{
Data: p,
},
})
if err != nil {
return 0, err
}
return len(p), nil
}
// Read reads a packet from the writer.
func (r *RpcStreamReadWriter) Read(p []byte) (n int, err error) {
readBuf := p
for len(readBuf) != 0 && err == nil {
var rn int
// if the buffer has data, read from it.
if r.buf.Len() != 0 {
rn, err = r.buf.Read(readBuf)
} else {
if n != 0 {
// if we read data to p already, return now.
break
}
var pkt *RpcStreamPacket
pkt, err = r.stream.Recv()
if err != nil {
break
}
if errStr := pkt.GetAck().GetError(); errStr != "" {
return n, errors.New(errStr)
}
data := pkt.GetData()
if len(data) == 0 {
continue
}
// read as much as possible directly to the output
rn = copy(readBuf, data)
if rn < len(data) {
// we read some of the data, buffer the rest.
_, _ = r.buf.Write(data[rn:]) // never returns an error
}
}
// advance readBuf by rn
n += rn
readBuf = readBuf[rn:]
}
return n, err
}
// Close closes the packet rw.
func (r *RpcStreamReadWriter) Close() error {
return r.stream.Close()
}
// _ is a type assertion
var _ io.ReadWriteCloser = (*RpcStreamReadWriter)(nil)