forked from Jille/raft-grpc-transport
-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpcapi.go
129 lines (114 loc) · 3.31 KB
/
grpcapi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package transport
import (
"context"
"io"
pb "github.com/Jille/raft-grpc-transport/proto"
"github.com/hashicorp/raft"
)
// These are requests incoming over gRPC that we need to relay to the Raft engine.
type gRPCAPI struct {
manager *Manager
// "Unsafe" to ensure compilation fails if new methods are added but not implemented
pb.UnsafeRaftTransportServer
}
func (g gRPCAPI) handleRPC(command interface{}, data io.Reader) (interface{}, error) {
ch := make(chan raft.RPCResponse, 1)
rpc := raft.RPC{
Command: command,
RespChan: ch,
Reader: data,
}
if isHeartbeat(command) {
// We can take the fast path and use the heartbeat callback and skip the queue in g.manager.rpcChan.
g.manager.heartbeatFuncMtx.Lock()
fn := g.manager.heartbeatFunc
g.manager.heartbeatFuncMtx.Unlock()
if fn != nil {
fn(rpc)
goto wait
}
}
g.manager.rpcChan <- rpc
wait:
resp := <-ch
if resp.Error != nil {
return nil, resp.Error
}
return resp.Response, nil
}
func (g gRPCAPI) AppendEntries(ctx context.Context, req *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error) {
resp, err := g.handleRPC(decodeAppendEntriesRequest(req), nil)
if err != nil {
return nil, err
}
return encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse)), nil
}
func (g gRPCAPI) RequestVote(ctx context.Context, req *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) {
resp, err := g.handleRPC(decodeRequestVoteRequest(req), nil)
if err != nil {
return nil, err
}
return encodeRequestVoteResponse(resp.(*raft.RequestVoteResponse)), nil
}
func (g gRPCAPI) TimeoutNow(ctx context.Context, req *pb.TimeoutNowRequest) (*pb.TimeoutNowResponse, error) {
resp, err := g.handleRPC(decodeTimeoutNowRequest(req), nil)
if err != nil {
return nil, err
}
return encodeTimeoutNowResponse(resp.(*raft.TimeoutNowResponse)), nil
}
func (g gRPCAPI) InstallSnapshot(s pb.RaftTransport_InstallSnapshotServer) error {
isr, err := s.Recv()
if err != nil {
return err
}
resp, err := g.handleRPC(decodeInstallSnapshotRequest(isr), &snapshotStream{s, isr.GetData()})
if err != nil {
return err
}
return s.SendAndClose(encodeInstallSnapshotResponse(resp.(*raft.InstallSnapshotResponse)))
}
type snapshotStream struct {
s pb.RaftTransport_InstallSnapshotServer
buf []byte
}
func (s *snapshotStream) Read(b []byte) (int, error) {
if len(s.buf) > 0 {
n := copy(b, s.buf)
s.buf = s.buf[n:]
return n, nil
}
m, err := s.s.Recv()
if err != nil {
return 0, err
}
n := copy(b, m.GetData())
if n < len(m.GetData()) {
s.buf = m.GetData()[n:]
}
return n, nil
}
func (g gRPCAPI) AppendEntriesPipeline(s pb.RaftTransport_AppendEntriesPipelineServer) error {
for {
msg, err := s.Recv()
if err != nil {
return err
}
resp, err := g.handleRPC(decodeAppendEntriesRequest(msg), nil)
if err != nil {
// TODO(quis): One failure doesn't have to break the entire stream?
// Or does it all go wrong when it's out of order anyway?
return err
}
if err := s.Send(encodeAppendEntriesResponse(resp.(*raft.AppendEntriesResponse))); err != nil {
return err
}
}
}
func isHeartbeat(command interface{}) bool {
req, ok := command.(*raft.AppendEntriesRequest)
if !ok {
return false
}
return req.Term != 0 && len(req.Leader) != 0 && req.PrevLogEntry == 0 && req.PrevLogTerm == 0 && len(req.Entries) == 0 && req.LeaderCommitIndex == 0
}