From 976444f77d5c4e052550a6cee9cc6eb3d01c0eb9 Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Wed, 13 Mar 2024 11:39:31 +0100 Subject: [PATCH 01/18] fix(connection): will retry connection for each message The channel will try to connect to the node for each message if a connection has not yet been established or if the node has disconnected. --- channel.go | 67 +++++++++++++++++++++++++++++++++++++++++++++------- gorums.pb.go | 26 ++++++++++---------- mgr.go | 4 +++- node.go | 44 ++++++++++++++++++++++++---------- opts.go | 5 ++-- 5 files changed, 110 insertions(+), 36 deletions(-) diff --git a/channel.go b/channel.go index 8d884737..38dcd872 100644 --- a/channel.go +++ b/channel.go @@ -2,6 +2,7 @@ package gorums import ( "context" + "fmt" "math" "math/rand" "sync" @@ -35,7 +36,7 @@ type responseRouter struct { type channel struct { sendQ chan request - nodeID uint32 + node *RawNode mu sync.Mutex lastError error latency time.Duration @@ -50,29 +51,39 @@ type channel struct { cancelStream context.CancelFunc responseRouters map[uint64]responseRouter responseMut sync.Mutex + connEstablished bool } func newChannel(n *RawNode) *channel { return &channel{ sendQ: make(chan request, n.mgr.opts.sendBuffer), backoffCfg: n.mgr.opts.backoff, - nodeID: n.ID(), + node: n, latency: -1 * time.Second, rand: rand.New(rand.NewSource(time.Now().UnixNano())), responseRouters: make(map[uint64]responseRouter), + connEstablished: false, } } func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error { - var err error c.parentCtx = ctx + go c.sendMsgs() + if conn == nil { + return fmt.Errorf("connection is nil") + } + return c.tryConnect(conn) +} + +func (c *channel) tryConnect(conn *grpc.ClientConn) error { + var err error c.streamCtx, c.cancelStream = context.WithCancel(c.parentCtx) c.gorumsClient = ordering.NewGorumsClient(conn) c.gorumsStream, err = c.gorumsClient.NodeStream(c.streamCtx) if err != nil { return err } - go c.sendMsgs() + c.connEstablished = true go c.recvMsgs() return nil } @@ -160,17 +171,23 @@ func (c *channel) sendMsgs() { return case req = <-c.sendQ: } + // try to connect to the node if previous attempts + // have failed or if the node has disconnected + if !c.isConnected() { + // streamBroken will be set if the connection fails + c.tryReconnect() + } // return error if stream is broken if c.streamBroken.get() { err := status.Errorf(codes.Unavailable, "stream is down") - c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.nodeID, msg: nil, err: err}) + c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), msg: nil, err: err}) continue } // else try to send message err := c.sendMsg(req) if err != nil { // return the error - c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.nodeID, msg: nil, err: err}) + c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), msg: nil, err: err}) } } } @@ -189,7 +206,7 @@ func (c *channel) recvMsgs() { } else { c.streamMut.RUnlock() err := status.FromProto(resp.Metadata.GetStatus()).Err() - c.routeResponse(resp.Metadata.MessageID, response{nid: c.nodeID, msg: resp.Message, err: err}) + c.routeResponse(resp.Metadata.MessageID, response{nid: c.node.ID(), msg: resp.Message, err: err}) } select { @@ -200,11 +217,37 @@ func (c *channel) recvMsgs() { } } -func (c *channel) reconnect() { +func (c *channel) tryReconnect() { + // a connection has never been established + if !c.connEstablished { + err := c.node.dial() + if err != nil { + c.streamBroken.set() + return + } + err = c.tryConnect(c.node.conn) + if err != nil { + c.streamBroken.set() + return + } + } + // the node has previously been connected + // but is now disconnected + if c.streamBroken.get() { + // try to reconnect only once + c.reconnect(1) + } +} + +func (c *channel) reconnect(maxRetries ...int) { c.streamMut.Lock() defer c.streamMut.Unlock() backoffCfg := c.backoffCfg + var maxretries float64 = -1 + if len(maxRetries) > 0 { + maxretries = float64(maxRetries[0]) + } var retries float64 for { var err error @@ -217,6 +260,10 @@ func (c *channel) reconnect() { } c.cancelStream() c.setLastErr(err) + if retries >= maxretries && maxretries > 0 { + c.streamBroken.set() + return + } delay := float64(backoffCfg.BaseDelay) max := float64(backoffCfg.MaxDelay) for r := retries; delay < max && r > 0; r-- { @@ -257,6 +304,10 @@ type atomicFlag struct { flag int32 } +func (c *channel) isConnected() bool { + return c.connEstablished && !c.streamBroken.get() +} + func (f *atomicFlag) set() { atomic.StoreInt32(&f.flag, 1) } func (f *atomicFlag) get() bool { return atomic.LoadInt32(&f.flag) == 1 } func (f *atomicFlag) clear() { atomic.StoreInt32(&f.flag, 0) } diff --git a/gorums.pb.go b/gorums.pb.go index 78befb0a..775eb5a4 100644 --- a/gorums.pb.go +++ b/gorums.pb.go @@ -1,15 +1,15 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 -// protoc v3.19.2 +// protoc-gen-go v1.32.0 +// protoc v3.12.4 // source: gorums.proto package gorums import ( + descriptor "github.com/golang/protobuf/protoc-gen-go/descriptor" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - descriptorpb "google.golang.org/protobuf/types/descriptorpb" reflect "reflect" ) @@ -22,7 +22,7 @@ const ( var file_gorums_proto_extTypes = []protoimpl.ExtensionInfo{ { - ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtendedType: (*descriptor.MethodOptions)(nil), ExtensionType: (*bool)(nil), Field: 50001, Name: "gorums.rpc", @@ -30,7 +30,7 @@ var file_gorums_proto_extTypes = []protoimpl.ExtensionInfo{ Filename: "gorums.proto", }, { - ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtendedType: (*descriptor.MethodOptions)(nil), ExtensionType: (*bool)(nil), Field: 50002, Name: "gorums.unicast", @@ -38,7 +38,7 @@ var file_gorums_proto_extTypes = []protoimpl.ExtensionInfo{ Filename: "gorums.proto", }, { - ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtendedType: (*descriptor.MethodOptions)(nil), ExtensionType: (*bool)(nil), Field: 50003, Name: "gorums.multicast", @@ -46,7 +46,7 @@ var file_gorums_proto_extTypes = []protoimpl.ExtensionInfo{ Filename: "gorums.proto", }, { - ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtendedType: (*descriptor.MethodOptions)(nil), ExtensionType: (*bool)(nil), Field: 50004, Name: "gorums.quorumcall", @@ -54,7 +54,7 @@ var file_gorums_proto_extTypes = []protoimpl.ExtensionInfo{ Filename: "gorums.proto", }, { - ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtendedType: (*descriptor.MethodOptions)(nil), ExtensionType: (*bool)(nil), Field: 50005, Name: "gorums.correctable", @@ -62,7 +62,7 @@ var file_gorums_proto_extTypes = []protoimpl.ExtensionInfo{ Filename: "gorums.proto", }, { - ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtendedType: (*descriptor.MethodOptions)(nil), ExtensionType: (*bool)(nil), Field: 50010, Name: "gorums.async", @@ -70,7 +70,7 @@ var file_gorums_proto_extTypes = []protoimpl.ExtensionInfo{ Filename: "gorums.proto", }, { - ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtendedType: (*descriptor.MethodOptions)(nil), ExtensionType: (*bool)(nil), Field: 50020, Name: "gorums.per_node_arg", @@ -78,7 +78,7 @@ var file_gorums_proto_extTypes = []protoimpl.ExtensionInfo{ Filename: "gorums.proto", }, { - ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtendedType: (*descriptor.MethodOptions)(nil), ExtensionType: (*string)(nil), Field: 50030, Name: "gorums.custom_return_type", @@ -87,7 +87,7 @@ var file_gorums_proto_extTypes = []protoimpl.ExtensionInfo{ }, } -// Extension fields to descriptorpb.MethodOptions. +// Extension fields to descriptor.MethodOptions. var ( // call types // @@ -155,7 +155,7 @@ var file_gorums_proto_rawDesc = []byte{ } var file_gorums_proto_goTypes = []interface{}{ - (*descriptorpb.MethodOptions)(nil), // 0: google.protobuf.MethodOptions + (*descriptor.MethodOptions)(nil), // 0: google.protobuf.MethodOptions } var file_gorums_proto_depIdxs = []int32{ 0, // 0: gorums.rpc:extendee -> google.protobuf.MethodOptions diff --git a/mgr.go b/mgr.go index 8097fe48..2b1914a7 100644 --- a/mgr.go +++ b/mgr.go @@ -119,7 +119,9 @@ func (m *RawManager) AddNode(node *RawNode) error { m.logger.Printf("connecting to %s with id %d\n", node, node.id) } if err := node.connect(m); err != nil { - return fmt.Errorf("connection failed for %s: %w", node, err) + if m.logger != nil { + m.logger.Println(fmt.Errorf("connection failed for %s: %w. will retry later.", node, err)) + } } m.mu.Lock() diff --git a/node.go b/node.go index a7914d44..da0877b3 100644 --- a/node.go +++ b/node.go @@ -61,28 +61,48 @@ func NewRawNodeWithID(addr string, id uint32) (*RawNode, error) { // connect to this node and associate it with the manager. func (n *RawNode) connect(mgr *RawManager) error { n.mgr = mgr + n.channel = newChannel(n) if n.mgr.opts.noConnect { return nil } - n.channel = newChannel(n) - var err error - ctx, cancel := context.WithTimeout(context.Background(), n.mgr.opts.nodeDialTimeout) - defer cancel() - n.conn, err = grpc.DialContext(ctx, n.addr, n.mgr.opts.grpcDialOpts...) - if err != nil { - return fmt.Errorf("dialing node failed: %w", err) + // ignoring the error because it will try to reconnect + // at a later time. + _ = n.dial() + ctx := n.ctxSetup() + if err := n.channel.connect(ctx, n.conn); err != nil { + return fmt.Errorf("starting stream failed: %w", err) + } + return nil +} + +// dials the node if it has not been done previously +func (n *RawNode) dial() error { + if n.conn == nil { + var err error + ctx, cancel := context.WithTimeout(context.Background(), n.mgr.opts.nodeDialTimeout) + defer cancel() + // error is ignored because we will retry the dial at a later time + n.conn, err = grpc.DialContext(ctx, n.addr, n.mgr.opts.grpcDialOpts...) + return err } + return nil +} + +// creates a context that governs the channel. It is +// used to stop all channel goroutines and the NodeStream. +// +// this method should be run for each connection to ensure +// fresh contexts. Reusing contexts could result in reusing +// a cancelled context. +func (n *RawNode) ctxSetup() context.Context { md := n.mgr.opts.metadata.Copy() if n.mgr.opts.perNodeMD != nil { md = metadata.Join(md, n.mgr.opts.perNodeMD(n.id)) } - // a context for all of the streams + var ctx context.Context ctx, n.cancel = context.WithCancel(context.Background()) ctx = metadata.NewOutgoingContext(ctx, md) - if err = n.channel.connect(ctx, n.conn); err != nil { - return fmt.Errorf("starting stream failed: %w", err) - } - return nil + return ctx } // close this node. diff --git a/opts.go b/opts.go index f58908ca..2def8017 100644 --- a/opts.go +++ b/opts.go @@ -22,8 +22,9 @@ type managerOptions struct { func newManagerOptions() managerOptions { return managerOptions{ - backoff: backoff.DefaultConfig, - sendBuffer: 0, + backoff: backoff.DefaultConfig, + sendBuffer: 0, + nodeDialTimeout: 50 * time.Millisecond, } } From 301dbe278b3b6ff5412c1e03b080149b32cbecf7 Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Wed, 13 Mar 2024 12:02:33 +0100 Subject: [PATCH 02/18] refactor: ran make dev to regenerate all dev files --- cmd/protoc-gen-gorums/dev/zorums.pb.go | 14 ++++---- .../dev/zorums_async_gorums.pb.go | 12 +++---- .../dev/zorums_correctable_gorums.pb.go | 20 +++++------ .../dev/zorums_multicast_gorums.pb.go | 10 +++--- .../dev/zorums_qspec_gorums.pb.go | 28 +++++++-------- .../dev/zorums_quorumcall_gorums.pb.go | 16 ++++----- .../dev/zorums_rpc_gorums.pb.go | 2 +- .../dev/zorums_server_gorums.pb.go | 34 +++++++++---------- .../dev/zorums_types_gorums.pb.go | 18 +++++----- .../dev/zorums_unicast_gorums.pb.go | 6 ++-- 10 files changed, 80 insertions(+), 80 deletions(-) diff --git a/cmd/protoc-gen-gorums/dev/zorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums.pb.go index ee46fc60..eab7dc98 100644 --- a/cmd/protoc-gen-gorums/dev/zorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums.pb.go @@ -1,16 +1,16 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.24.4 +// protoc-gen-go v1.32.0 +// protoc v3.12.4 // source: zorums.proto package dev import ( + empty "github.com/golang/protobuf/ptypes/empty" _ "github.com/relab/gorums" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" reflect "reflect" sync "sync" ) @@ -332,10 +332,10 @@ func file_zorums_proto_rawDescGZIP() []byte { var file_zorums_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_zorums_proto_goTypes = []interface{}{ - (*Request)(nil), // 0: dev.Request - (*Response)(nil), // 1: dev.Response - (*MyResponse)(nil), // 2: dev.MyResponse - (*emptypb.Empty)(nil), // 3: google.protobuf.Empty + (*Request)(nil), // 0: dev.Request + (*Response)(nil), // 1: dev.Response + (*MyResponse)(nil), // 2: dev.MyResponse + (*empty.Empty)(nil), // 3: google.protobuf.Empty } var file_zorums_proto_depIdxs = []int32{ 0, // 0: dev.ZorumsService.GRPCCall:input_type -> dev.Request diff --git a/cmd/protoc-gen-gorums/dev/zorums_async_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_async_gorums.pb.go index a53800e4..9479840b 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_async_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_async_gorums.pb.go @@ -1,16 +1,16 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v4.24.4 +// protoc v3.12.4 // source: zorums.proto package dev import ( context "context" + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" protoreflect "google.golang.org/protobuf/reflect/protoreflect" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -123,9 +123,9 @@ func (c *Configuration) QuorumCallAsyncEmpty(ctx context.Context, in *Request) * Method: "dev.ZorumsService.QuorumCallAsyncEmpty", } cd.QuorumFunction = func(req protoreflect.ProtoMessage, replies map[uint32]protoreflect.ProtoMessage) (protoreflect.ProtoMessage, bool) { - r := make(map[uint32]*emptypb.Empty, len(replies)) + r := make(map[uint32]*empty.Empty, len(replies)) for k, v := range replies { - r[k] = v.(*emptypb.Empty) + r[k] = v.(*empty.Empty) } return c.qspec.QuorumCallAsyncEmptyQF(req.(*Request), r) } @@ -136,7 +136,7 @@ func (c *Configuration) QuorumCallAsyncEmpty(ctx context.Context, in *Request) * // QuorumCallAsyncEmpty2 for testing imported message type; with same return // type as QuorumCallAsync: Response. -func (c *Configuration) QuorumCallAsyncEmpty2(ctx context.Context, in *emptypb.Empty) *AsyncResponse { +func (c *Configuration) QuorumCallAsyncEmpty2(ctx context.Context, in *empty.Empty) *AsyncResponse { cd := gorums.QuorumCallData{ Message: in, Method: "dev.ZorumsService.QuorumCallAsyncEmpty2", @@ -146,7 +146,7 @@ func (c *Configuration) QuorumCallAsyncEmpty2(ctx context.Context, in *emptypb.E for k, v := range replies { r[k] = v.(*Response) } - return c.qspec.QuorumCallAsyncEmpty2QF(req.(*emptypb.Empty), r) + return c.qspec.QuorumCallAsyncEmpty2QF(req.(*empty.Empty), r) } fut := c.RawConfiguration.AsyncCall(ctx, cd) diff --git a/cmd/protoc-gen-gorums/dev/zorums_correctable_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_correctable_gorums.pb.go index 171d056f..611b4e03 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_correctable_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_correctable_gorums.pb.go @@ -1,16 +1,16 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v4.24.4 +// protoc v3.12.4 // source: zorums.proto package dev import ( context "context" + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" protoreflect "google.golang.org/protobuf/reflect/protoreflect" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -110,9 +110,9 @@ func (c *Configuration) CorrectableEmpty(ctx context.Context, in *Request) *Corr ServerStream: false, } cd.QuorumFunction = func(req protoreflect.ProtoMessage, replies map[uint32]protoreflect.ProtoMessage) (protoreflect.ProtoMessage, int, bool) { - r := make(map[uint32]*emptypb.Empty, len(replies)) + r := make(map[uint32]*empty.Empty, len(replies)) for k, v := range replies { - r[k] = v.(*emptypb.Empty) + r[k] = v.(*empty.Empty) } return c.qspec.CorrectableEmptyQF(req.(*Request), r) } @@ -123,7 +123,7 @@ func (c *Configuration) CorrectableEmpty(ctx context.Context, in *Request) *Corr // CorrectableEmpty2 for testing imported message type; with same return // type as Correctable: Response. -func (c *Configuration) CorrectableEmpty2(ctx context.Context, in *emptypb.Empty) *CorrectableResponse { +func (c *Configuration) CorrectableEmpty2(ctx context.Context, in *empty.Empty) *CorrectableResponse { cd := gorums.CorrectableCallData{ Message: in, Method: "dev.ZorumsService.CorrectableEmpty2", @@ -134,7 +134,7 @@ func (c *Configuration) CorrectableEmpty2(ctx context.Context, in *emptypb.Empty for k, v := range replies { r[k] = v.(*Response) } - return c.qspec.CorrectableEmpty2QF(req.(*emptypb.Empty), r) + return c.qspec.CorrectableEmpty2QF(req.(*empty.Empty), r) } corr := c.RawConfiguration.CorrectableCall(ctx, cd) @@ -231,9 +231,9 @@ func (c *Configuration) CorrectableStreamEmpty(ctx context.Context, in *Request) ServerStream: true, } cd.QuorumFunction = func(req protoreflect.ProtoMessage, replies map[uint32]protoreflect.ProtoMessage) (protoreflect.ProtoMessage, int, bool) { - r := make(map[uint32]*emptypb.Empty, len(replies)) + r := make(map[uint32]*empty.Empty, len(replies)) for k, v := range replies { - r[k] = v.(*emptypb.Empty) + r[k] = v.(*empty.Empty) } return c.qspec.CorrectableStreamEmptyQF(req.(*Request), r) } @@ -244,7 +244,7 @@ func (c *Configuration) CorrectableStreamEmpty(ctx context.Context, in *Request) // CorrectableEmpty2 for testing imported message type; with same return // type as Correctable: Response. -func (c *Configuration) CorrectableStreamEmpty2(ctx context.Context, in *emptypb.Empty) *CorrectableStreamResponse { +func (c *Configuration) CorrectableStreamEmpty2(ctx context.Context, in *empty.Empty) *CorrectableStreamResponse { cd := gorums.CorrectableCallData{ Message: in, Method: "dev.ZorumsService.CorrectableStreamEmpty2", @@ -255,7 +255,7 @@ func (c *Configuration) CorrectableStreamEmpty2(ctx context.Context, in *emptypb for k, v := range replies { r[k] = v.(*Response) } - return c.qspec.CorrectableStreamEmpty2QF(req.(*emptypb.Empty), r) + return c.qspec.CorrectableStreamEmpty2QF(req.(*empty.Empty), r) } corr := c.RawConfiguration.CorrectableCall(ctx, cd) diff --git a/cmd/protoc-gen-gorums/dev/zorums_multicast_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_multicast_gorums.pb.go index d2581bc3..19994a79 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_multicast_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_multicast_gorums.pb.go @@ -1,16 +1,16 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v4.24.4 +// protoc v3.12.4 // source: zorums.proto package dev import ( context "context" + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" protoreflect "google.golang.org/protobuf/reflect/protoreflect" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -55,7 +55,7 @@ func (c *Configuration) Multicast2(ctx context.Context, in *Request, opts ...gor } // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // Multicast3 is testing imported message type. func (c *Configuration) Multicast3(ctx context.Context, in *Request, opts ...gorums.CallOption) { @@ -68,10 +68,10 @@ func (c *Configuration) Multicast3(ctx context.Context, in *Request, opts ...gor } // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // Multicast4 is testing imported message type. -func (c *Configuration) Multicast4(ctx context.Context, in *emptypb.Empty, opts ...gorums.CallOption) { +func (c *Configuration) Multicast4(ctx context.Context, in *empty.Empty, opts ...gorums.CallOption) { cd := gorums.QuorumCallData{ Message: in, Method: "dev.ZorumsService.Multicast4", diff --git a/cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go index e996362c..0aa7c57f 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go @@ -1,14 +1,14 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v4.24.4 +// protoc v3.12.4 // source: zorums.proto package dev import ( + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -54,15 +54,15 @@ type QuorumSpec interface { // quorum call method. The in parameter is the request object // supplied to the QuorumCallEmpty method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed - // you should implement your quorum function with '_ *emptypb.Empty'. - QuorumCallEmptyQF(in *emptypb.Empty, replies map[uint32]*Response) (*Response, bool) + // you should implement your quorum function with '_ *empty.Empty'. + QuorumCallEmptyQF(in *empty.Empty, replies map[uint32]*Response) (*Response, bool) // QuorumCallEmpty2QF is the quorum function for the QuorumCallEmpty2 // quorum call method. The in parameter is the request object // supplied to the QuorumCallEmpty2 method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed // you should implement your quorum function with '_ *Request'. - QuorumCallEmpty2QF(in *Request, replies map[uint32]*emptypb.Empty) (*emptypb.Empty, bool) + QuorumCallEmpty2QF(in *Request, replies map[uint32]*empty.Empty) (*empty.Empty, bool) // QuorumCallAsyncQF is the quorum function for the QuorumCallAsync // asynchronous quorum call method. The in parameter is the request object @@ -104,14 +104,14 @@ type QuorumSpec interface { // supplied to the QuorumCallAsyncEmpty method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed // you should implement your quorum function with '_ *Request'. - QuorumCallAsyncEmptyQF(in *Request, replies map[uint32]*emptypb.Empty) (*emptypb.Empty, bool) + QuorumCallAsyncEmptyQF(in *Request, replies map[uint32]*empty.Empty) (*empty.Empty, bool) // QuorumCallAsyncEmpty2QF is the quorum function for the QuorumCallAsyncEmpty2 // asynchronous quorum call method. The in parameter is the request object // supplied to the QuorumCallAsyncEmpty2 method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed - // you should implement your quorum function with '_ *emptypb.Empty'. - QuorumCallAsyncEmpty2QF(in *emptypb.Empty, replies map[uint32]*Response) (*Response, bool) + // you should implement your quorum function with '_ *empty.Empty'. + QuorumCallAsyncEmpty2QF(in *empty.Empty, replies map[uint32]*Response) (*Response, bool) // CorrectableQF is the quorum function for the Correctable // correctable quorum call method. The in parameter is the request object @@ -146,14 +146,14 @@ type QuorumSpec interface { // supplied to the CorrectableEmpty method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed // you should implement your quorum function with '_ *Request'. - CorrectableEmptyQF(in *Request, replies map[uint32]*emptypb.Empty) (*emptypb.Empty, int, bool) + CorrectableEmptyQF(in *Request, replies map[uint32]*empty.Empty) (*empty.Empty, int, bool) // CorrectableEmpty2QF is the quorum function for the CorrectableEmpty2 // correctable quorum call method. The in parameter is the request object // supplied to the CorrectableEmpty2 method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed - // you should implement your quorum function with '_ *emptypb.Empty'. - CorrectableEmpty2QF(in *emptypb.Empty, replies map[uint32]*Response) (*Response, int, bool) + // you should implement your quorum function with '_ *empty.Empty'. + CorrectableEmpty2QF(in *empty.Empty, replies map[uint32]*Response) (*Response, int, bool) // CorrectableStreamQF is the quorum function for the CorrectableStream // correctable stream quorum call method. The in parameter is the request object @@ -188,12 +188,12 @@ type QuorumSpec interface { // supplied to the CorrectableStreamEmpty method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed // you should implement your quorum function with '_ *Request'. - CorrectableStreamEmptyQF(in *Request, replies map[uint32]*emptypb.Empty) (*emptypb.Empty, int, bool) + CorrectableStreamEmptyQF(in *Request, replies map[uint32]*empty.Empty) (*empty.Empty, int, bool) // CorrectableStreamEmpty2QF is the quorum function for the CorrectableStreamEmpty2 // correctable stream quorum call method. The in parameter is the request object // supplied to the CorrectableStreamEmpty2 method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed - // you should implement your quorum function with '_ *emptypb.Empty'. - CorrectableStreamEmpty2QF(in *emptypb.Empty, replies map[uint32]*Response) (*Response, int, bool) + // you should implement your quorum function with '_ *empty.Empty'. + CorrectableStreamEmpty2QF(in *empty.Empty, replies map[uint32]*Response) (*Response, int, bool) } diff --git a/cmd/protoc-gen-gorums/dev/zorums_quorumcall_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_quorumcall_gorums.pb.go index 181b2970..afdca89a 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_quorumcall_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_quorumcall_gorums.pb.go @@ -1,16 +1,16 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v4.24.4 +// protoc v3.12.4 // source: zorums.proto package dev import ( context "context" + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" protoreflect "google.golang.org/protobuf/reflect/protoreflect" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -111,7 +111,7 @@ func (c *Configuration) QuorumCallCombo(ctx context.Context, in *Request, f func } // QuorumCallEmpty for testing imported message type. -func (c *Configuration) QuorumCallEmpty(ctx context.Context, in *emptypb.Empty) (resp *Response, err error) { +func (c *Configuration) QuorumCallEmpty(ctx context.Context, in *empty.Empty) (resp *Response, err error) { cd := gorums.QuorumCallData{ Message: in, Method: "dev.ZorumsService.QuorumCallEmpty", @@ -121,7 +121,7 @@ func (c *Configuration) QuorumCallEmpty(ctx context.Context, in *emptypb.Empty) for k, v := range replies { r[k] = v.(*Response) } - return c.qspec.QuorumCallEmptyQF(req.(*emptypb.Empty), r) + return c.qspec.QuorumCallEmptyQF(req.(*empty.Empty), r) } res, err := c.RawConfiguration.QuorumCall(ctx, cd) @@ -132,15 +132,15 @@ func (c *Configuration) QuorumCallEmpty(ctx context.Context, in *emptypb.Empty) } // QuorumCallEmpty2 for testing imported message type. -func (c *Configuration) QuorumCallEmpty2(ctx context.Context, in *Request) (resp *emptypb.Empty, err error) { +func (c *Configuration) QuorumCallEmpty2(ctx context.Context, in *Request) (resp *empty.Empty, err error) { cd := gorums.QuorumCallData{ Message: in, Method: "dev.ZorumsService.QuorumCallEmpty2", } cd.QuorumFunction = func(req protoreflect.ProtoMessage, replies map[uint32]protoreflect.ProtoMessage) (protoreflect.ProtoMessage, bool) { - r := make(map[uint32]*emptypb.Empty, len(replies)) + r := make(map[uint32]*empty.Empty, len(replies)) for k, v := range replies { - r[k] = v.(*emptypb.Empty) + r[k] = v.(*empty.Empty) } return c.qspec.QuorumCallEmpty2QF(req.(*Request), r) } @@ -149,5 +149,5 @@ func (c *Configuration) QuorumCallEmpty2(ctx context.Context, in *Request) (resp if err != nil { return nil, err } - return res.(*emptypb.Empty), err + return res.(*empty.Empty), err } diff --git a/cmd/protoc-gen-gorums/dev/zorums_rpc_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_rpc_gorums.pb.go index b5aff4a2..32a1b77d 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_rpc_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_rpc_gorums.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v4.24.4 +// protoc v3.12.4 // source: zorums.proto package dev diff --git a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go index 4f6bd860..78b65078 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_server_gorums.pb.go @@ -1,16 +1,16 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v4.24.4 +// protoc v3.12.4 // source: zorums.proto package dev import ( + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" ordering "github.com/relab/gorums/ordering" proto "google.golang.org/protobuf/proto" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -27,32 +27,32 @@ type ZorumsService interface { QuorumCallPerNodeArg(ctx gorums.ServerCtx, request *Request) (response *Response, err error) QuorumCallCustomReturnType(ctx gorums.ServerCtx, request *Request) (response *Response, err error) QuorumCallCombo(ctx gorums.ServerCtx, request *Request) (response *Response, err error) - QuorumCallEmpty(ctx gorums.ServerCtx, request *emptypb.Empty) (response *Response, err error) - QuorumCallEmpty2(ctx gorums.ServerCtx, request *Request) (response *emptypb.Empty, err error) + QuorumCallEmpty(ctx gorums.ServerCtx, request *empty.Empty) (response *Response, err error) + QuorumCallEmpty2(ctx gorums.ServerCtx, request *Request) (response *empty.Empty, err error) Multicast(ctx gorums.ServerCtx, request *Request) MulticastPerNodeArg(ctx gorums.ServerCtx, request *Request) Multicast2(ctx gorums.ServerCtx, request *Request) Multicast3(ctx gorums.ServerCtx, request *Request) - Multicast4(ctx gorums.ServerCtx, request *emptypb.Empty) + Multicast4(ctx gorums.ServerCtx, request *empty.Empty) QuorumCallAsync(ctx gorums.ServerCtx, request *Request) (response *Response, err error) QuorumCallAsyncPerNodeArg(ctx gorums.ServerCtx, request *Request) (response *Response, err error) QuorumCallAsyncCustomReturnType(ctx gorums.ServerCtx, request *Request) (response *Response, err error) QuorumCallAsyncCombo(ctx gorums.ServerCtx, request *Request) (response *Response, err error) QuorumCallAsync2(ctx gorums.ServerCtx, request *Request) (response *Response, err error) - QuorumCallAsyncEmpty(ctx gorums.ServerCtx, request *Request) (response *emptypb.Empty, err error) - QuorumCallAsyncEmpty2(ctx gorums.ServerCtx, request *emptypb.Empty) (response *Response, err error) + QuorumCallAsyncEmpty(ctx gorums.ServerCtx, request *Request) (response *empty.Empty, err error) + QuorumCallAsyncEmpty2(ctx gorums.ServerCtx, request *empty.Empty) (response *Response, err error) Correctable(ctx gorums.ServerCtx, request *Request) (response *Response, err error) CorrectablePerNodeArg(ctx gorums.ServerCtx, request *Request) (response *Response, err error) CorrectableCustomReturnType(ctx gorums.ServerCtx, request *Request) (response *Response, err error) CorrectableCombo(ctx gorums.ServerCtx, request *Request) (response *Response, err error) - CorrectableEmpty(ctx gorums.ServerCtx, request *Request) (response *emptypb.Empty, err error) - CorrectableEmpty2(ctx gorums.ServerCtx, request *emptypb.Empty) (response *Response, err error) + CorrectableEmpty(ctx gorums.ServerCtx, request *Request) (response *empty.Empty, err error) + CorrectableEmpty2(ctx gorums.ServerCtx, request *empty.Empty) (response *Response, err error) CorrectableStream(ctx gorums.ServerCtx, request *Request, send func(response *Response) error) error CorrectableStreamPerNodeArg(ctx gorums.ServerCtx, request *Request, send func(response *Response) error) error CorrectableStreamCustomReturnType(ctx gorums.ServerCtx, request *Request, send func(response *Response) error) error CorrectableStreamCombo(ctx gorums.ServerCtx, request *Request, send func(response *Response) error) error - CorrectableStreamEmpty(ctx gorums.ServerCtx, request *Request, send func(response *emptypb.Empty) error) error - CorrectableStreamEmpty2(ctx gorums.ServerCtx, request *emptypb.Empty, send func(response *Response) error) error + CorrectableStreamEmpty(ctx gorums.ServerCtx, request *Request, send func(response *empty.Empty) error) error + CorrectableStreamEmpty2(ctx gorums.ServerCtx, request *empty.Empty, send func(response *Response) error) error Unicast(ctx gorums.ServerCtx, request *Request) Unicast2(ctx gorums.ServerCtx, request *Request) } @@ -89,7 +89,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallEmpty", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { - req := in.Message.(*emptypb.Empty) + req := in.Message.(*empty.Empty) defer ctx.Release() resp, err := impl.QuorumCallEmpty(ctx, req) gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) @@ -121,7 +121,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { impl.Multicast3(ctx, req) }) srv.RegisterHandler("dev.ZorumsService.Multicast4", func(ctx gorums.ServerCtx, in *gorums.Message, _ chan<- *gorums.Message) { - req := in.Message.(*emptypb.Empty) + req := in.Message.(*empty.Empty) defer ctx.Release() impl.Multicast4(ctx, req) }) @@ -162,7 +162,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.QuorumCallAsyncEmpty2", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { - req := in.Message.(*emptypb.Empty) + req := in.Message.(*empty.Empty) defer ctx.Release() resp, err := impl.QuorumCallAsyncEmpty2(ctx, req) gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) @@ -198,7 +198,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) }) srv.RegisterHandler("dev.ZorumsService.CorrectableEmpty2", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { - req := in.Message.(*emptypb.Empty) + req := in.Message.(*empty.Empty) defer ctx.Release() resp, err := impl.CorrectableEmpty2(ctx, req) gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) @@ -254,7 +254,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { srv.RegisterHandler("dev.ZorumsService.CorrectableStreamEmpty", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*Request) defer ctx.Release() - err := impl.CorrectableStreamEmpty(ctx, req, func(resp *emptypb.Empty) error { + err := impl.CorrectableStreamEmpty(ctx, req, func(resp *empty.Empty) error { // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg md := proto.Clone(in.Metadata) return gorums.SendMessage(ctx, finished, gorums.WrapMessage(md.(*ordering.Metadata), resp, nil)) @@ -264,7 +264,7 @@ func RegisterZorumsServiceServer(srv *gorums.Server, impl ZorumsService) { } }) srv.RegisterHandler("dev.ZorumsService.CorrectableStreamEmpty2", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { - req := in.Message.(*emptypb.Empty) + req := in.Message.(*empty.Empty) defer ctx.Release() err := impl.CorrectableStreamEmpty2(ctx, req, func(resp *Response) error { // create a copy of the metadata, to avoid a data race between WrapMessage and SendMsg diff --git a/cmd/protoc-gen-gorums/dev/zorums_types_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_types_gorums.pb.go index 30efcd28..1983654e 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_types_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_types_gorums.pb.go @@ -1,14 +1,14 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v4.24.4 +// protoc v3.12.4 // source: zorums.proto package dev import ( + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -20,7 +20,7 @@ const ( type internalEmpty struct { nid uint32 - reply *emptypb.Empty + reply *empty.Empty err error } @@ -37,12 +37,12 @@ type AsyncEmpty struct { // Get returns the reply and any error associated with the called method. // The method blocks until a reply or error is available. -func (f *AsyncEmpty) Get() (*emptypb.Empty, error) { +func (f *AsyncEmpty) Get() (*empty.Empty, error) { resp, err := f.Async.Get() if err != nil { return nil, err } - return resp.(*emptypb.Empty), err + return resp.(*empty.Empty), err } // AsyncMyResponse is a async object for processing replies. @@ -85,12 +85,12 @@ type CorrectableEmpty struct { // intermediate) reply or error is available. Level is set to LevelNotSet if no // reply has yet been received. The Done or Watch methods should be used to // ensure that a reply is available. -func (c *CorrectableEmpty) Get() (*emptypb.Empty, int, error) { +func (c *CorrectableEmpty) Get() (*empty.Empty, int, error) { resp, level, err := c.Correctable.Get() if err != nil { return nil, level, err } - return resp.(*emptypb.Empty), level, err + return resp.(*empty.Empty), level, err } // CorrectableMyResponse is a correctable object for processing replies. @@ -139,12 +139,12 @@ type CorrectableStreamEmpty struct { // intermediate) reply or error is available. Level is set to LevelNotSet if no // reply has yet been received. The Done or Watch methods should be used to // ensure that a reply is available. -func (c *CorrectableStreamEmpty) Get() (*emptypb.Empty, int, error) { +func (c *CorrectableStreamEmpty) Get() (*empty.Empty, int, error) { resp, level, err := c.Correctable.Get() if err != nil { return nil, level, err } - return resp.(*emptypb.Empty), level, err + return resp.(*empty.Empty), level, err } // CorrectableStreamMyResponse is a correctable object for processing replies. diff --git a/cmd/protoc-gen-gorums/dev/zorums_unicast_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_unicast_gorums.pb.go index 9ad1ee1e..e5291e85 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_unicast_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_unicast_gorums.pb.go @@ -1,15 +1,15 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v4.24.4 +// protoc v3.12.4 // source: zorums.proto package dev import ( context "context" + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -31,7 +31,7 @@ func (n *Node) Unicast(ctx context.Context, in *Request, opts ...gorums.CallOpti } // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // Unicast2 is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. From d904f3d9e6e0be3ab55da6e86d63467d192e6f03 Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Wed, 20 Mar 2024 00:01:12 +0100 Subject: [PATCH 03/18] refactor: changed connEstablished to atomicflag and added documentation --- channel.go | 54 ++++++++++++++++++++++++++++++++++++++---------------- mgr.go | 2 +- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/channel.go b/channel.go index 38dcd872..6b94c4a1 100644 --- a/channel.go +++ b/channel.go @@ -46,12 +46,12 @@ type channel struct { gorumsStream ordering.Gorums_NodeStreamClient streamMut sync.RWMutex streamBroken atomicFlag + connEstablished atomicFlag parentCtx context.Context streamCtx context.Context cancelStream context.CancelFunc responseRouters map[uint64]responseRouter responseMut sync.Mutex - connEstablished bool } func newChannel(n *RawNode) *channel { @@ -62,19 +62,28 @@ func newChannel(n *RawNode) *channel { latency: -1 * time.Second, rand: rand.New(rand.NewSource(time.Now().UnixNano())), responseRouters: make(map[uint64]responseRouter), - connEstablished: false, } } func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error { + // the parentCtx governs the channel and is used to properly + // shut it down c.parentCtx = ctx + // it is important to start the goroutine regardless of a + // successful connection to prevent a deadlock when + // invoking one of the call types. The method provides + // a listener on the sendQ and contains the retry logic go c.sendMsgs() + // no need to proceed if dial setup failed if conn == nil { return fmt.Errorf("connection is nil") } return c.tryConnect(conn) } +// creating a stream could fail even though conn != nil due to +// the non-blocking dial. Hence, we need to try to connect to +// the node before starting the receiving goroutine func (c *channel) tryConnect(conn *grpc.ClientConn) error { var err error c.streamCtx, c.cancelStream = context.WithCancel(c.parentCtx) @@ -83,7 +92,9 @@ func (c *channel) tryConnect(conn *grpc.ClientConn) error { if err != nil { return err } - c.connEstablished = true + // connEstablished indicates whether recvMsgs have been started or not and if + // the dial was successful. streamBroken only reports the status of the stream. + c.connEstablished.set() go c.recvMsgs() return nil } @@ -106,7 +117,14 @@ func (c *channel) enqueue(req request, responseChan chan<- response, streaming b c.responseRouters[req.msg.Metadata.MessageID] = responseRouter{responseChan, streaming} c.responseMut.Unlock() } - c.sendQ <- req + // either enqueue the request on the sendQ or respond + // with error if the node is closed + select { + case <-c.parentCtx.Done(): + c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), msg: nil, err: fmt.Errorf("channel closed")}) + return + case c.sendQ <- req: + } } func (c *channel) deleteRouter(msgID uint64) { @@ -202,7 +220,7 @@ func (c *channel) recvMsgs() { c.streamMut.RUnlock() c.setLastErr(err) // attempt to reconnect - c.reconnect() + c.reconnect(-1) } else { c.streamMut.RUnlock() err := status.FromProto(resp.Metadata.GetStatus()).Err() @@ -219,12 +237,16 @@ func (c *channel) recvMsgs() { func (c *channel) tryReconnect() { // a connection has never been established - if !c.connEstablished { + if !c.connEstablished.get() { + // the setup stage when dialing could have + // previously failed and we thus need to + // make a connection is up. err := c.node.dial() if err != nil { c.streamBroken.set() return } + // try to create a stream err = c.tryConnect(c.node.conn) if err != nil { c.streamBroken.set() @@ -239,15 +261,12 @@ func (c *channel) tryReconnect() { } } -func (c *channel) reconnect(maxRetries ...int) { +// maxRetries = -1 represents infinite retries +func (c *channel) reconnect(maxRetries float64) { c.streamMut.Lock() defer c.streamMut.Unlock() backoffCfg := c.backoffCfg - var maxretries float64 = -1 - if len(maxRetries) > 0 { - maxretries = float64(maxRetries[0]) - } var retries float64 for { var err error @@ -260,7 +279,7 @@ func (c *channel) reconnect(maxRetries ...int) { } c.cancelStream() c.setLastErr(err) - if retries >= maxretries && maxretries > 0 { + if retries >= maxRetries && maxRetries > 0 { c.streamBroken.set() return } @@ -300,12 +319,15 @@ func (c *channel) channelLatency() time.Duration { return c.latency } -type atomicFlag struct { - flag int32 +func (c *channel) isConnected() bool { + // streamBroken.get() is initially false and NodeStream could be down + // even though node.conn is not nil. Hence, we need connEstablished + // to make sure a proper connection has been made. + return c.connEstablished.get() && !c.streamBroken.get() } -func (c *channel) isConnected() bool { - return c.connEstablished && !c.streamBroken.get() +type atomicFlag struct { + flag int32 } func (f *atomicFlag) set() { atomic.StoreInt32(&f.flag, 1) } diff --git a/mgr.go b/mgr.go index 2b1914a7..63285d90 100644 --- a/mgr.go +++ b/mgr.go @@ -120,7 +120,7 @@ func (m *RawManager) AddNode(node *RawNode) error { } if err := node.connect(m); err != nil { if m.logger != nil { - m.logger.Println(fmt.Errorf("connection failed for %s: %w. will retry later.", node, err)) + m.logger.Printf("Failed to connect to %s: %v (retrying)", node, err) } } From 55d70fb930686d5139401383a75605cc2a83371f Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Wed, 20 Mar 2024 00:02:23 +0100 Subject: [PATCH 04/18] fix: ctx will be canceled when node is closed and added documentation --- node.go | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/node.go b/node.go index da0877b3..713554c8 100644 --- a/node.go +++ b/node.go @@ -61,40 +61,40 @@ func NewRawNodeWithID(addr string, id uint32) (*RawNode, error) { // connect to this node and associate it with the manager. func (n *RawNode) connect(mgr *RawManager) error { n.mgr = mgr - n.channel = newChannel(n) if n.mgr.opts.noConnect { return nil } - // ignoring the error because it will try to reconnect - // at a later time. + n.channel = newChannel(n) + // ignoring the error because it will try to reconnect later _ = n.dial() - ctx := n.ctxSetup() + ctx := n.newContext() if err := n.channel.connect(ctx, n.conn); err != nil { - return fmt.Errorf("starting stream failed: %w", err) + return fmt.Errorf("failed to start stream: %w", err) } return nil } // dials the node if it has not been done previously func (n *RawNode) dial() error { - if n.conn == nil { - var err error - ctx, cancel := context.WithTimeout(context.Background(), n.mgr.opts.nodeDialTimeout) - defer cancel() - // error is ignored because we will retry the dial at a later time - n.conn, err = grpc.DialContext(ctx, n.addr, n.mgr.opts.grpcDialOpts...) - return err + // dial has previously succeded so we can do nothing + if n.conn != nil { + return nil } - return nil + var err error + ctx, cancel := context.WithTimeout(context.Background(), n.mgr.opts.nodeDialTimeout) + defer cancel() + n.conn, err = grpc.DialContext(ctx, n.addr, n.mgr.opts.grpcDialOpts...) + return err } -// creates a context that governs the channel. It is -// used to stop all channel goroutines and the NodeStream. +// newContext returns a new context for this node's channel. +// This context is used by the channel implementation to stop +// all goroutines and the NodeStream, when the context is canceled. // -// this method should be run for each connection to ensure +// This method must be called for each connection to ensure // fresh contexts. Reusing contexts could result in reusing // a cancelled context. -func (n *RawNode) ctxSetup() context.Context { +func (n *RawNode) newContext() context.Context { md := n.mgr.opts.metadata.Copy() if n.mgr.opts.perNodeMD != nil { md = metadata.Join(md, n.mgr.opts.perNodeMD(n.id)) @@ -107,13 +107,14 @@ func (n *RawNode) ctxSetup() context.Context { // close this node. func (n *RawNode) close() error { + // important to cancel first to stop goroutines + n.cancel() if n.conn == nil { return nil } if err := n.conn.Close(); err != nil { return fmt.Errorf("%d: conn close error: %w", n.id, err) } - n.cancel() return nil } From dcc75e68514b5ec568ef151ecedf4b447260e9b6 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Wed, 20 Mar 2024 23:37:49 +0100 Subject: [PATCH 05/18] doc: mainly added a few doc comments and move if conn == nil The only code change is the move of "if conn == nil" from connect() to tryConnect(). It seems more logical to have this "guard check" at the top of a method, rather than right after starting the sendMsgs goroutine. There should be no semantic differences here, but who knows... this code is a bit intricate. Also adds some doc comment above some (internal) functions, mainly for documentation purposes and some language polish and typos. --- channel.go | 22 +++++++++++----------- node.go | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/channel.go b/channel.go index 6b94c4a1..d221e064 100644 --- a/channel.go +++ b/channel.go @@ -74,10 +74,6 @@ func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error { // invoking one of the call types. The method provides // a listener on the sendQ and contains the retry logic go c.sendMsgs() - // no need to proceed if dial setup failed - if conn == nil { - return fmt.Errorf("connection is nil") - } return c.tryConnect(conn) } @@ -85,6 +81,10 @@ func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error { // the non-blocking dial. Hence, we need to try to connect to // the node before starting the receiving goroutine func (c *channel) tryConnect(conn *grpc.ClientConn) error { + if conn == nil { + // no need to proceed if dial setup failed + return fmt.Errorf("connection is nil") + } var err error c.streamCtx, c.cancelStream = context.WithCancel(c.parentCtx) c.gorumsClient = ordering.NewGorumsClient(conn) @@ -192,7 +192,7 @@ func (c *channel) sendMsgs() { // try to connect to the node if previous attempts // have failed or if the node has disconnected if !c.isConnected() { - // streamBroken will be set if the connection fails + // streamBroken will be set if the reconnection fails c.tryReconnect() } // return error if stream is broken @@ -236,11 +236,10 @@ func (c *channel) recvMsgs() { } func (c *channel) tryReconnect() { - // a connection has never been established if !c.connEstablished.get() { - // the setup stage when dialing could have - // previously failed and we thus need to - // make a connection is up. + // a connection has never been established; i.e., + // a previous dial attempt could have failed. + // we need to make sure the connection is up. err := c.node.dial() if err != nil { c.streamBroken.set() @@ -261,7 +260,8 @@ func (c *channel) tryReconnect() { } } -// maxRetries = -1 represents infinite retries +// reconnect tries to reconnect to the node using an exponential backoff strategy. +// maxRetries = -1 represents infinite retries. func (c *channel) reconnect(maxRetries float64) { c.streamMut.Lock() defer c.streamMut.Unlock() @@ -270,7 +270,6 @@ func (c *channel) reconnect(maxRetries float64) { var retries float64 for { var err error - c.streamCtx, c.cancelStream = context.WithCancel(c.parentCtx) c.gorumsStream, err = c.gorumsClient.NodeStream(c.streamCtx) if err == nil { @@ -319,6 +318,7 @@ func (c *channel) channelLatency() time.Duration { return c.latency } +// isConnected returns true if the channel has an active connection to the node. func (c *channel) isConnected() bool { // streamBroken.get() is initially false and NodeStream could be down // even though node.conn is not nil. Hence, we need connEstablished diff --git a/node.go b/node.go index 713554c8..456851a3 100644 --- a/node.go +++ b/node.go @@ -74,9 +74,9 @@ func (n *RawNode) connect(mgr *RawManager) error { return nil } -// dials the node if it has not been done previously +// dial the node unless it has already been done. func (n *RawNode) dial() error { - // dial has previously succeded so we can do nothing + // dial has previously succeeded so we do nothing if n.conn != nil { return nil } From e78895cc0b0f25c80010242a1725221bad652e20 Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Thu, 21 Mar 2024 09:50:03 +0100 Subject: [PATCH 06/18] refactor(connection): guardclause and reduced lock time Added a guardclause to prevent goroutines being started more than once. Also, the channel will only lock when trying to recreate a nodestream, making it possible to retry when sending messages. --- channel.go | 54 +++++++++++++++++++++++++++++++++++++++++++++--------- node.go | 5 +++-- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/channel.go b/channel.go index 6b94c4a1..f1465304 100644 --- a/channel.go +++ b/channel.go @@ -92,13 +92,30 @@ func (c *channel) tryConnect(conn *grpc.ClientConn) error { if err != nil { return err } - // connEstablished indicates whether recvMsgs have been started or not and if - // the dial was successful. streamBroken only reports the status of the stream. - c.connEstablished.set() - go c.recvMsgs() + c.streamBroken.clear() + // safe guard because creating more than one recvMsgs goroutine is problematic + if !c.connEstablished.get() { + // connEstablished indicates whether recvMsgs have been started or not and if + // the dial was successful. streamBroken only reports the status of the stream. + c.connEstablished.set() + go c.recvMsgs() + } return nil } +func (c *channel) cancelPendingMsgs() { + c.responseMut.Lock() + defer c.responseMut.Unlock() + for msgID, router := range c.responseRouters { + err := status.Errorf(codes.Unavailable, "stream is down") + router.c <- response{nid: c.node.ID(), msg: nil, err: err} + // delete the router if we are only expecting a single message + if !router.streaming { + delete(c.responseRouters, msgID) + } + } +} + func (c *channel) routeResponse(msgID uint64, resp response) { c.responseMut.Lock() defer c.responseMut.Unlock() @@ -219,7 +236,14 @@ func (c *channel) recvMsgs() { c.streamBroken.set() c.streamMut.RUnlock() c.setLastErr(err) - // attempt to reconnect + // The only time we reach this point is when the + // stream goes down AFTER a message has been sent and the node + // is waiting for a reply. We thus need to respond with a stream + // is down error on all pending messages. + c.cancelPendingMsgs() + // attempt to reconnect. It will try to reconnect indefinitely + // or until the node is closed. This is necessary when streaming + // is enabled. c.reconnect(-1) } else { c.streamMut.RUnlock() @@ -246,7 +270,11 @@ func (c *channel) tryReconnect() { c.streamBroken.set() return } - // try to create a stream + // try to create a stream. Should NOT be + // run if a connection has previously + // been established because it will start + // a the recvMsgs goroutine. Otherwise, we + // could suffer from leaking goroutines. err = c.tryConnect(c.node.conn) if err != nil { c.streamBroken.set() @@ -256,28 +284,36 @@ func (c *channel) tryReconnect() { // the node has previously been connected // but is now disconnected if c.streamBroken.get() { - // try to reconnect only once + // try to reconnect only once. + // Maybe add this as a user option? c.reconnect(1) } } // maxRetries = -1 represents infinite retries func (c *channel) reconnect(maxRetries float64) { - c.streamMut.Lock() - defer c.streamMut.Unlock() backoffCfg := c.backoffCfg var retries float64 for { var err error + c.streamMut.Lock() + // check if stream is already up + if !c.streamBroken.get() { + // do nothing because stream is up + c.streamMut.Unlock() + return + } c.streamCtx, c.cancelStream = context.WithCancel(c.parentCtx) c.gorumsStream, err = c.gorumsClient.NodeStream(c.streamCtx) if err == nil { c.streamBroken.clear() + c.streamMut.Unlock() return } c.cancelStream() + c.streamMut.Unlock() c.setLastErr(err) if retries >= maxRetries && maxRetries > 0 { c.streamBroken.set() diff --git a/node.go b/node.go index 713554c8..bb51f414 100644 --- a/node.go +++ b/node.go @@ -76,9 +76,10 @@ func (n *RawNode) connect(mgr *RawManager) error { // dials the node if it has not been done previously func (n *RawNode) dial() error { - // dial has previously succeded so we can do nothing + // dial has previously succeded but creating a stream failed + // so we need to close it before creating a new. if n.conn != nil { - return nil + n.conn.Close() } var err error ctx, cancel := context.WithTimeout(context.Background(), n.mgr.opts.nodeDialTimeout) From d487c5f67f320cc6d60ce2f1b6a2e7ae53c329aa Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Thu, 21 Mar 2024 09:53:37 +0100 Subject: [PATCH 07/18] test(channel): added test for the channel --- channel_test.go | 188 ++++++++++++++++++++++++++++++++++++++ testing_gorums.go | 21 +++++ tests/mock/mock.pb.go | 204 ++++++++++++++++++++++++++++++++++++++++++ tests/mock/mock.proto | 10 +++ 4 files changed, 423 insertions(+) create mode 100644 channel_test.go create mode 100644 tests/mock/mock.pb.go create mode 100644 tests/mock/mock.proto diff --git a/channel_test.go b/channel_test.go new file mode 100644 index 00000000..925b1f2b --- /dev/null +++ b/channel_test.go @@ -0,0 +1,188 @@ +package gorums + +import ( + "context" + "testing" + "time" + + "github.com/relab/gorums/ordering" + "github.com/relab/gorums/tests/mock" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/encoding" +) + +type mockSrv struct{} + +func (mockSrv) Test(ctx ServerCtx, _ *mock.Request) (resp *mock.Response, err error) { + return nil, nil +} + +func dummyMgr() *RawManager { + return NewRawManager( + WithGrpcDialOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + ), + ) +} + +var handlerName = "mock.Server.Test" + +func dummySrv() *Server { + mockSrv := &mockSrv{} + srv := NewServer() + srv.RegisterHandler(handlerName, func(ctx ServerCtx, in *Message, finished chan<- *Message) { + req := in.Message.(*mock.Request) + defer ctx.Release() + resp, err := mockSrv.Test(ctx, req) + SendMessage(ctx, finished, WrapMessage(in.Metadata, resp, err)) + }) + return srv +} + +func TestChannelCreation(t *testing.T) { + node, err := NewRawNode("127.0.0.1:5000") + if err != nil { + t.Fatal(err) + } + mgr := dummyMgr() + defer mgr.Close() + // a proper connection should NOT be esablished here + node.connect(mgr) + + replyChan := make(chan response, 1) + go func() { + md := &ordering.Metadata{MessageID: 1, Method: handlerName} + req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}} + node.channel.enqueue(req, replyChan, false) + }() + select { + case <-replyChan: + case <-time.After(3 * time.Second): + t.Fatal("deadlock: impossible to enqueue messages to the node") + } +} + +func TestChannelSuccessfulConnection(t *testing.T) { + addrs, teardown := TestSetup(t, 1, func(_ int) ServerIface { + return dummySrv() + }) + defer teardown() + mgr := dummyMgr() + defer mgr.Close() + node, err := NewRawNode(addrs[0]) + if err != nil { + t.Fatal(err) + } + + if err = mgr.AddNode(node); err != nil { + t.Fatal(err) + } + if len(mgr.Nodes()) < 1 { + t.Fatal("the node was not added to the configuration") + } + if !node.channel.isConnected() { + t.Fatal("a connection could not be made to a live node") + } + if node.conn == nil { + t.Fatal("connection should not be nil") + } +} + +func TestChannelUnsuccessfulConnection(t *testing.T) { + mgr := dummyMgr() + defer mgr.Close() + // no servers are listening on the given address + node, err := NewRawNode("127.0.0.1:5000") + if err != nil { + t.Fatal(err) + } + + // the node should still be added to the configuration + if err = mgr.AddNode(node); err != nil { + t.Fatal(err) + } + if len(mgr.Nodes()) < 1 { + t.Fatal("the node was not added to the configuration") + } + if node.conn == nil { + t.Fatal("connection should not be nil when NOT using WithBlock()") + } +} + +func TestChannelReconnection(t *testing.T) { + if encoding.GetCodec(ContentSubtype) == nil { + encoding.RegisterCodec(NewCodec()) + } + srvAddr := "127.0.0.1:5000" + // wait to start the server + startServer, stopServer := TestServerSetup(t, srvAddr, dummySrv()) + node, err := NewRawNode(srvAddr) + if err != nil { + t.Fatal(err) + } + mgr := dummyMgr() + defer mgr.Close() + // a proper connection should NOT be esablished here because server is not started + node.connect(mgr) + + // send first message when server is down + replyChan1 := make(chan response, 1) + go func() { + md := &ordering.Metadata{MessageID: 1, Method: handlerName} + req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}} + node.channel.enqueue(req, replyChan1, false) + }() + + // check response: should be error because server is down + select { + case resp := <-replyChan1: + if resp.err == nil { + t.Fatal("should have received an error") + } + case <-time.After(3 * time.Second): + t.Fatal("deadlock: impossible to enqueue messages to the node") + } + + // start the server + startServer() + + // send second message when server is up + replyChan2 := make(chan response, 1) + go func() { + md := &ordering.Metadata{MessageID: 2, Method: handlerName} + req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}, opts: getCallOptions(E_Multicast, nil)} + node.channel.enqueue(req, replyChan2, false) + }() + + // check response: error should be nil because server is up + select { + case resp := <-replyChan2: + if resp.err != nil { + t.Fatal(resp.err) + } + case <-time.After(3 * time.Second): + t.Fatal("deadlock: impossible to enqueue messages to the node") + } + + // stop the server + stopServer() + + // send third message when server has been previously up but is now down + replyChan3 := make(chan response, 1) + go func() { + md := &ordering.Metadata{MessageID: 3, Method: handlerName} + req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}} + node.channel.enqueue(req, replyChan3, false) + }() + + // check response: should be error because server is down + select { + case resp3 := <-replyChan3: + if resp3.err == nil { + t.Fatal("should have received an error", resp3.msg) + } + case <-time.After(3 * time.Second): + t.Fatal("deadlock: impossible to enqueue messages to the node") + } +} diff --git a/testing_gorums.go b/testing_gorums.go index 01dfa9dc..c725d5a2 100644 --- a/testing_gorums.go +++ b/testing_gorums.go @@ -3,6 +3,7 @@ package gorums import ( "net" "testing" + "time" ) // ServerIface is the interface that must be implemented by a server in order to support the TestSetup function. @@ -37,3 +38,23 @@ func TestSetup(t testing.TB, numServers int, srvFn func(i int) ServerIface) ([]s } return addrs, stopFn } + +func TestServerSetup(t testing.TB, addr string, srv ServerIface) (func(), func()) { + t.Helper() + var lis net.Listener + var err error + startFn := func() { + lis, err = net.Listen("tcp", addr) + if err != nil { + t.Fatalf("Failed to listen on port: %v", err) + } + go func() { _ = srv.Serve(lis) }() + // to ensure that the server has started + time.Sleep(100 * time.Millisecond) + } + stopFn := func() { + lis.Close() + srv.Stop() + } + return startFn, stopFn +} diff --git a/tests/mock/mock.pb.go b/tests/mock/mock.pb.go new file mode 100644 index 00000000..5808077e --- /dev/null +++ b/tests/mock/mock.pb.go @@ -0,0 +1,204 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v3.12.4 +// source: mock.proto + +package mock + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Val string `protobuf:"bytes,1,opt,name=val,proto3" json:"val,omitempty"` +} + +func (x *Request) Reset() { + *x = Request{} + if protoimpl.UnsafeEnabled { + mi := &file_mock_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Request) ProtoMessage() {} + +func (x *Request) ProtoReflect() protoreflect.Message { + mi := &file_mock_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { + return file_mock_proto_rawDescGZIP(), []int{0} +} + +func (x *Request) GetVal() string { + if x != nil { + return x.Val + } + return "" +} + +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Val string `protobuf:"bytes,1,opt,name=val,proto3" json:"val,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_mock_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_mock_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_mock_proto_rawDescGZIP(), []int{1} +} + +func (x *Response) GetVal() string { + if x != nil { + return x.Val + } + return "" +} + +var File_mock_proto protoreflect.FileDescriptor + +var file_mock_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x6d, 0x6f, 0x63, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x6d, 0x6f, + 0x63, 0x6b, 0x22, 0x1b, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, + 0x03, 0x76, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x76, 0x61, 0x6c, 0x22, + 0x1c, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x76, + 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x76, 0x61, 0x6c, 0x42, 0x24, 0x5a, + 0x22, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x6c, 0x61, + 0x62, 0x2f, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x73, 0x2f, 0x6d, + 0x6f, 0x63, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_mock_proto_rawDescOnce sync.Once + file_mock_proto_rawDescData = file_mock_proto_rawDesc +) + +func file_mock_proto_rawDescGZIP() []byte { + file_mock_proto_rawDescOnce.Do(func() { + file_mock_proto_rawDescData = protoimpl.X.CompressGZIP(file_mock_proto_rawDescData) + }) + return file_mock_proto_rawDescData +} + +var file_mock_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_mock_proto_goTypes = []interface{}{ + (*Request)(nil), // 0: mock.Request + (*Response)(nil), // 1: mock.Response +} +var file_mock_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_mock_proto_init() } +func file_mock_proto_init() { + if File_mock_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_mock_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mock_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_mock_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_mock_proto_goTypes, + DependencyIndexes: file_mock_proto_depIdxs, + MessageInfos: file_mock_proto_msgTypes, + }.Build() + File_mock_proto = out.File + file_mock_proto_rawDesc = nil + file_mock_proto_goTypes = nil + file_mock_proto_depIdxs = nil +} diff --git a/tests/mock/mock.proto b/tests/mock/mock.proto new file mode 100644 index 00000000..b5a47c9e --- /dev/null +++ b/tests/mock/mock.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package mock; +option go_package = "github.com/relab/gorums/tests/mock"; + +message Request { + string val = 1; +} +message Response { + string val = 1; +} From 720f01a1e5482b336e01df5438f49e53f95af8d4 Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Thu, 21 Mar 2024 10:20:18 +0100 Subject: [PATCH 08/18] fix(channel): added stream mutex on initial channel creation --- channel.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/channel.go b/channel.go index 81d5b972..b90fea04 100644 --- a/channel.go +++ b/channel.go @@ -85,10 +85,12 @@ func (c *channel) tryConnect(conn *grpc.ClientConn) error { // no need to proceed if dial setup failed return fmt.Errorf("connection is nil") } + c.streamMut.Lock() var err error c.streamCtx, c.cancelStream = context.WithCancel(c.parentCtx) c.gorumsClient = ordering.NewGorumsClient(conn) c.gorumsStream, err = c.gorumsClient.NodeStream(c.streamCtx) + c.streamMut.Unlock() if err != nil { return err } @@ -272,8 +274,10 @@ func (c *channel) tryReconnect() { // try to create a stream. Should NOT be // run if a connection has previously // been established because it will start - // a the recvMsgs goroutine. Otherwise, we + // a recvMsgs goroutine. Otherwise, we // could suffer from leaking goroutines. + // a guardclause has been added in the + // method to prevent this. err = c.tryConnect(c.node.conn) if err != nil { c.streamBroken.set() From 52979777b2111afa8220d8e9229e7173aeb786c7 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 23 Mar 2024 21:56:16 +0100 Subject: [PATCH 09/18] doc: revised the comments and added some doc comments --- channel.go | 70 ++++++++++++++++++++++++------------------------------ node.go | 3 +-- 2 files changed, 32 insertions(+), 41 deletions(-) diff --git a/channel.go b/channel.go index b90fea04..b39e358d 100644 --- a/channel.go +++ b/channel.go @@ -17,6 +17,8 @@ import ( "google.golang.org/protobuf/reflect/protoreflect" ) +var streamDownErr = status.Error(codes.Unavailable, "stream is down") + type request struct { ctx context.Context msg *Message @@ -66,23 +68,25 @@ func newChannel(n *RawNode) *channel { } func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error { - // the parentCtx governs the channel and is used to properly - // shut it down + // parentCtx controls the channel and is used to shut it down c.parentCtx = ctx - // it is important to start the goroutine regardless of a - // successful connection to prevent a deadlock when - // invoking one of the call types. The method provides - // a listener on the sendQ and contains the retry logic + // to prevent deadlock when invoking a call type, + // we need to start the sendMsgs goroutine even + // though the connection has not yet been established. + // The goroutine will block on the sendQ until a + // connection has been established. go c.sendMsgs() return c.tryConnect(conn) } -// creating a stream could fail even though conn != nil due to -// the non-blocking dial. Hence, we need to try to connect to -// the node before starting the receiving goroutine +// create stream and start the receiving goroutine. +// +// Note that the stream could fail even though conn != nil due +// to the non-blocking dial. Hence, we need to try to connect +// to the node before starting the receiving goroutine. func (c *channel) tryConnect(conn *grpc.ClientConn) error { if conn == nil { - // no need to proceed if dial setup failed + // no need to proceed if dial failed return fmt.Errorf("connection is nil") } c.streamMut.Lock() @@ -95,10 +99,10 @@ func (c *channel) tryConnect(conn *grpc.ClientConn) error { return err } c.streamBroken.clear() - // safe guard because creating more than one recvMsgs goroutine is problematic + // guard against creating multiple recvMsgs goroutines if !c.connEstablished.get() { - // connEstablished indicates whether recvMsgs have been started or not and if - // the dial was successful. streamBroken only reports the status of the stream. + // connEstablished indicates dial was successful + // and that recvMsgs have started c.connEstablished.set() go c.recvMsgs() } @@ -109,8 +113,7 @@ func (c *channel) cancelPendingMsgs() { c.responseMut.Lock() defer c.responseMut.Unlock() for msgID, router := range c.responseRouters { - err := status.Errorf(codes.Unavailable, "stream is down") - router.c <- response{nid: c.node.ID(), msg: nil, err: err} + router.c <- response{nid: c.node.ID(), err: streamDownErr} // delete the router if we are only expecting a single message if !router.streaming { delete(c.responseRouters, msgID) @@ -140,7 +143,7 @@ func (c *channel) enqueue(req request, responseChan chan<- response, streaming b // with error if the node is closed select { case <-c.parentCtx.Done(): - c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), msg: nil, err: fmt.Errorf("channel closed")}) + c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: fmt.Errorf("channel closed")}) return case c.sendQ <- req: } @@ -178,12 +181,12 @@ func (c *channel) sendMsg(req request) (err error) { case <-done: // all is good case <-req.ctx.Done(): - // Both channels could be ready at the same time, so we should check 'done' again. + // Both channels could be ready at the same time, so we must check 'done' again. select { case <-done: // false alarm default: - // cause reconnect + // trigger reconnect c.cancelStream() } } @@ -216,15 +219,14 @@ func (c *channel) sendMsgs() { } // return error if stream is broken if c.streamBroken.get() { - err := status.Errorf(codes.Unavailable, "stream is down") - c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), msg: nil, err: err}) + c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: streamDownErr}) continue } // else try to send message err := c.sendMsg(req) if err != nil { // return the error - c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), msg: nil, err: err}) + c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: err}) } } } @@ -238,14 +240,12 @@ func (c *channel) recvMsgs() { c.streamBroken.set() c.streamMut.RUnlock() c.setLastErr(err) - // The only time we reach this point is when the - // stream goes down AFTER a message has been sent and the node - // is waiting for a reply. We thus need to respond with a stream - // is down error on all pending messages. + // we only reach this point when the stream failed AFTER a message + // was sent and we are waiting for a reply. We thus need to respond + // with a stream is down error on all pending messages. c.cancelPendingMsgs() - // attempt to reconnect. It will try to reconnect indefinitely - // or until the node is closed. This is necessary when streaming - // is enabled. + // attempt to reconnect indefinitely until the node is closed. + // This is necessary when streaming is enabled. c.reconnect(-1) } else { c.streamMut.RUnlock() @@ -263,29 +263,21 @@ func (c *channel) recvMsgs() { func (c *channel) tryReconnect() { if !c.connEstablished.get() { - // a connection has never been established; i.e., + // a connection has not yet been established; i.e., // a previous dial attempt could have failed. - // we need to make sure the connection is up. + // try dialing again. err := c.node.dial() if err != nil { c.streamBroken.set() return } - // try to create a stream. Should NOT be - // run if a connection has previously - // been established because it will start - // a recvMsgs goroutine. Otherwise, we - // could suffer from leaking goroutines. - // a guardclause has been added in the - // method to prevent this. err = c.tryConnect(c.node.conn) if err != nil { c.streamBroken.set() return } } - // the node has previously been connected - // but is now disconnected + // the node was previously connected but is now disconnected if c.streamBroken.get() { // try to reconnect only once. // Maybe add this as a user option? diff --git a/node.go b/node.go index bd8c56da..1b7e947d 100644 --- a/node.go +++ b/node.go @@ -76,9 +76,8 @@ func (n *RawNode) connect(mgr *RawManager) error { // dial the node and close the current connection. func (n *RawNode) dial() error { - // dial has previously succeeded but creating a stream failed - // so we need to close it before creating a new. if n.conn != nil { + // close the current connection before dialing again. n.conn.Close() } var err error From de46337543f467c1e406f42189c9e42157f3c9f2 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sun, 24 Mar 2024 14:05:03 +0100 Subject: [PATCH 10/18] fix: typos --- channel_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/channel_test.go b/channel_test.go index 925b1f2b..cf14efa9 100644 --- a/channel_test.go +++ b/channel_test.go @@ -47,7 +47,7 @@ func TestChannelCreation(t *testing.T) { } mgr := dummyMgr() defer mgr.Close() - // a proper connection should NOT be esablished here + // a proper connection should NOT be established here node.connect(mgr) replyChan := make(chan response, 1) @@ -123,7 +123,7 @@ func TestChannelReconnection(t *testing.T) { } mgr := dummyMgr() defer mgr.Close() - // a proper connection should NOT be esablished here because server is not started + // a proper connection should NOT be established here because server is not started node.connect(mgr) // send first message when server is down From ac26307c452ebb93f335fc0ed8fd9e41f737c4b1 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sun, 24 Mar 2024 14:50:49 +0100 Subject: [PATCH 11/18] fix: tweaked error output for TestChannelReconnection --- channel_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/channel_test.go b/channel_test.go index cf14efa9..6ca633a3 100644 --- a/channel_test.go +++ b/channel_test.go @@ -138,13 +138,12 @@ func TestChannelReconnection(t *testing.T) { select { case resp := <-replyChan1: if resp.err == nil { - t.Fatal("should have received an error") + t.Error("response err: got , want error") } case <-time.After(3 * time.Second): t.Fatal("deadlock: impossible to enqueue messages to the node") } - // start the server startServer() // send second message when server is up @@ -159,13 +158,12 @@ func TestChannelReconnection(t *testing.T) { select { case resp := <-replyChan2: if resp.err != nil { - t.Fatal(resp.err) + t.Errorf("response err: got %v, want ", resp.err) } case <-time.After(3 * time.Second): t.Fatal("deadlock: impossible to enqueue messages to the node") } - // stop the server stopServer() // send third message when server has been previously up but is now down @@ -180,7 +178,7 @@ func TestChannelReconnection(t *testing.T) { select { case resp3 := <-replyChan3: if resp3.err == nil { - t.Fatal("should have received an error", resp3.msg) + t.Error("response err: got , want error") } case <-time.After(3 * time.Second): t.Fatal("deadlock: impossible to enqueue messages to the node") From 2dd48d73ad88bb9eec75997550b9ab5f7317e305 Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Mon, 25 Mar 2024 17:54:04 +0100 Subject: [PATCH 12/18] refactor(connection): moved logic to remove confusing naming --- channel.go | 22 ++++++++++------------ node.go | 6 ++---- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/channel.go b/channel.go index b39e358d..9fbb6a65 100644 --- a/channel.go +++ b/channel.go @@ -56,8 +56,8 @@ type channel struct { responseMut sync.Mutex } -func newChannel(n *RawNode) *channel { - return &channel{ +func newChannel(n *RawNode, ctx context.Context) *channel { + c := &channel{ sendQ: make(chan request, n.mgr.opts.sendBuffer), backoffCfg: n.mgr.opts.backoff, node: n, @@ -65,9 +65,6 @@ func newChannel(n *RawNode) *channel { rand: rand.New(rand.NewSource(time.Now().UnixNano())), responseRouters: make(map[uint64]responseRouter), } -} - -func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error { // parentCtx controls the channel and is used to shut it down c.parentCtx = ctx // to prevent deadlock when invoking a call type, @@ -76,7 +73,7 @@ func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error { // The goroutine will block on the sendQ until a // connection has been established. go c.sendMsgs() - return c.tryConnect(conn) + return c } // create stream and start the receiving goroutine. @@ -84,7 +81,7 @@ func (c *channel) connect(ctx context.Context, conn *grpc.ClientConn) error { // Note that the stream could fail even though conn != nil due // to the non-blocking dial. Hence, we need to try to connect // to the node before starting the receiving goroutine. -func (c *channel) tryConnect(conn *grpc.ClientConn) error { +func (c *channel) createConnection(conn *grpc.ClientConn) error { if conn == nil { // no need to proceed if dial failed return fmt.Errorf("connection is nil") @@ -215,7 +212,7 @@ func (c *channel) sendMsgs() { // have failed or if the node has disconnected if !c.isConnected() { // streamBroken will be set if the reconnection fails - c.tryReconnect() + c.connect() } // return error if stream is broken if c.streamBroken.get() { @@ -261,7 +258,7 @@ func (c *channel) recvMsgs() { } } -func (c *channel) tryReconnect() { +func (c *channel) connect() error { if !c.connEstablished.get() { // a connection has not yet been established; i.e., // a previous dial attempt could have failed. @@ -269,12 +266,12 @@ func (c *channel) tryReconnect() { err := c.node.dial() if err != nil { c.streamBroken.set() - return + return err } - err = c.tryConnect(c.node.conn) + err = c.createConnection(c.node.conn) if err != nil { c.streamBroken.set() - return + return err } } // the node was previously connected but is now disconnected @@ -283,6 +280,7 @@ func (c *channel) tryReconnect() { // Maybe add this as a user option? c.reconnect(1) } + return nil } // reconnect tries to reconnect to the node using an exponential backoff strategy. diff --git a/node.go b/node.go index 1b7e947d..d5cf72ba 100644 --- a/node.go +++ b/node.go @@ -64,11 +64,9 @@ func (n *RawNode) connect(mgr *RawManager) error { if n.mgr.opts.noConnect { return nil } - n.channel = newChannel(n) - // ignoring the error because it will try to reconnect later - _ = n.dial() ctx := n.newContext() - if err := n.channel.connect(ctx, n.conn); err != nil { + n.channel = newChannel(n, ctx) + if err := n.channel.connect(); err != nil { return fmt.Errorf("failed to start stream: %w", err) } return nil From da290f6f9d7e06c1851d0df4ea64e190f31fbc7c Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 26 Mar 2024 14:53:03 +0100 Subject: [PATCH 13/18] chore: renamed some funcs and added docs --- channel.go | 35 ++++++++++++++++++----------------- node.go | 5 ++--- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/channel.go b/channel.go index 9fbb6a65..4d9ca36e 100644 --- a/channel.go +++ b/channel.go @@ -56,7 +56,13 @@ type channel struct { responseMut sync.Mutex } -func newChannel(n *RawNode, ctx context.Context) *channel { +// newChannel creates a new channel for the given node and starts the sending goroutine. +// +// Note that we start the sending goroutine even though the +// connection has not yet been established. This is to prevent +// deadlock when invoking a call type, as the goroutine will +// block on the sendQ until a connection has been established. +func newChannel(ctx context.Context, n *RawNode) *channel { c := &channel{ sendQ: make(chan request, n.mgr.opts.sendBuffer), backoffCfg: n.mgr.opts.backoff, @@ -67,21 +73,16 @@ func newChannel(n *RawNode, ctx context.Context) *channel { } // parentCtx controls the channel and is used to shut it down c.parentCtx = ctx - // to prevent deadlock when invoking a call type, - // we need to start the sendMsgs goroutine even - // though the connection has not yet been established. - // The goroutine will block on the sendQ until a - // connection has been established. - go c.sendMsgs() + go c.sender() return c } -// create stream and start the receiving goroutine. +// newNodeStream creates a stream and starts the receiving goroutine. // // Note that the stream could fail even though conn != nil due // to the non-blocking dial. Hence, we need to try to connect // to the node before starting the receiving goroutine. -func (c *channel) createConnection(conn *grpc.ClientConn) error { +func (c *channel) newNodeStream(conn *grpc.ClientConn) error { if conn == nil { // no need to proceed if dial failed return fmt.Errorf("connection is nil") @@ -96,12 +97,12 @@ func (c *channel) createConnection(conn *grpc.ClientConn) error { return err } c.streamBroken.clear() - // guard against creating multiple recvMsgs goroutines + // guard against creating multiple receiver goroutines if !c.connEstablished.get() { // connEstablished indicates dial was successful - // and that recvMsgs have started + // and that receiver have started c.connEstablished.set() - go c.recvMsgs() + go c.receiver() } return nil } @@ -111,7 +112,7 @@ func (c *channel) cancelPendingMsgs() { defer c.responseMut.Unlock() for msgID, router := range c.responseRouters { router.c <- response{nid: c.node.ID(), err: streamDownErr} - // delete the router if we are only expecting a single message + // delete the router if we are only expecting a single reply message if !router.streaming { delete(c.responseRouters, msgID) } @@ -123,7 +124,7 @@ func (c *channel) routeResponse(msgID uint64, resp response) { defer c.responseMut.Unlock() if router, ok := c.responseRouters[msgID]; ok { router.c <- resp - // delete the router if we are only expecting a single message + // delete the router if we are only expecting a single reply message if !router.streaming { delete(c.responseRouters, msgID) } @@ -200,7 +201,7 @@ func (c *channel) sendMsg(req request) (err error) { return err } -func (c *channel) sendMsgs() { +func (c *channel) sender() { var req request for { select { @@ -228,7 +229,7 @@ func (c *channel) sendMsgs() { } } -func (c *channel) recvMsgs() { +func (c *channel) receiver() { for { resp := newMessage(responseType) c.streamMut.RLock() @@ -268,7 +269,7 @@ func (c *channel) connect() error { c.streamBroken.set() return err } - err = c.createConnection(c.node.conn) + err = c.newNodeStream(c.node.conn) if err != nil { c.streamBroken.set() return err diff --git a/node.go b/node.go index 287dacc8..14aadbef 100644 --- a/node.go +++ b/node.go @@ -65,7 +65,7 @@ func (n *RawNode) connect(mgr *RawManager) error { return nil } ctx := n.newContext() - n.channel = newChannel(n, ctx) + n.channel = newChannel(ctx, n) if err := n.channel.connect(); err != nil { return nodeError{nodeID: n.id, cause: err} } @@ -99,8 +99,7 @@ func (n *RawNode) newContext() context.Context { } var ctx context.Context ctx, n.cancel = context.WithCancel(context.Background()) - ctx = metadata.NewOutgoingContext(ctx, md) - return ctx + return metadata.NewOutgoingContext(ctx, md) } // close this node. From 98fd597b83d13bbcf4a87ceba384cb47d1bac3da Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 26 Mar 2024 14:54:59 +0100 Subject: [PATCH 14/18] chore: call n.newContext() from within newChannel() We can avoid passing the context to channel, since we anyway have access to the relevant RawNode. --- channel.go | 4 ++-- node.go | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/channel.go b/channel.go index 4d9ca36e..a3b1d55f 100644 --- a/channel.go +++ b/channel.go @@ -62,7 +62,7 @@ type channel struct { // connection has not yet been established. This is to prevent // deadlock when invoking a call type, as the goroutine will // block on the sendQ until a connection has been established. -func newChannel(ctx context.Context, n *RawNode) *channel { +func newChannel(n *RawNode) *channel { c := &channel{ sendQ: make(chan request, n.mgr.opts.sendBuffer), backoffCfg: n.mgr.opts.backoff, @@ -72,7 +72,7 @@ func newChannel(ctx context.Context, n *RawNode) *channel { responseRouters: make(map[uint64]responseRouter), } // parentCtx controls the channel and is used to shut it down - c.parentCtx = ctx + c.parentCtx = n.newContext() go c.sender() return c } diff --git a/node.go b/node.go index 14aadbef..0fb65827 100644 --- a/node.go +++ b/node.go @@ -64,8 +64,7 @@ func (n *RawNode) connect(mgr *RawManager) error { if n.mgr.opts.noConnect { return nil } - ctx := n.newContext() - n.channel = newChannel(ctx, n) + n.channel = newChannel(n) if err := n.channel.connect(); err != nil { return nodeError{nodeID: n.id, cause: err} } From 3ded667d8ef8343b912f3b5b0792e7f2a00ee129 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 26 Mar 2024 15:08:16 +0100 Subject: [PATCH 15/18] fix: data race in correctable call type Since we now cancel pending messages, we have introduced a data race that doesn't appear to have been triggered before. --- channel.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/channel.go b/channel.go index a3b1d55f..9429cc11 100644 --- a/channel.go +++ b/channel.go @@ -148,6 +148,8 @@ func (c *channel) enqueue(req request, responseChan chan<- response, streaming b } func (c *channel) deleteRouter(msgID uint64) { + c.responseMut.Lock() + defer c.responseMut.Unlock() delete(c.responseRouters, msgID) } From 7c573fea54663da545e1f16f66c3bef190146203 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 26 Mar 2024 20:15:24 +0100 Subject: [PATCH 16/18] chore: unexported testServerSetup To avoid having to support another test function, let's make it unexported for now. Currently, we only use it in one place; maybe we can harmonize it with some other helper. --- channel_test.go | 2 +- testing_gorums.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/channel_test.go b/channel_test.go index 6ca633a3..e0de5514 100644 --- a/channel_test.go +++ b/channel_test.go @@ -116,7 +116,7 @@ func TestChannelReconnection(t *testing.T) { } srvAddr := "127.0.0.1:5000" // wait to start the server - startServer, stopServer := TestServerSetup(t, srvAddr, dummySrv()) + startServer, stopServer := testServerSetup(t, srvAddr, dummySrv()) node, err := NewRawNode(srvAddr) if err != nil { t.Fatal(err) diff --git a/testing_gorums.go b/testing_gorums.go index c725d5a2..3aee891d 100644 --- a/testing_gorums.go +++ b/testing_gorums.go @@ -39,7 +39,7 @@ func TestSetup(t testing.TB, numServers int, srvFn func(i int) ServerIface) ([]s return addrs, stopFn } -func TestServerSetup(t testing.TB, addr string, srv ServerIface) (func(), func()) { +func testServerSetup(t testing.TB, addr string, srv ServerIface) (func(), func()) { t.Helper() var lis net.Listener var err error From d2815e8947a5ad1c9c3f104fcbc917eceb30a26d Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 26 Mar 2024 20:20:09 +0100 Subject: [PATCH 17/18] fix: removed encoding.RegisterCodec It does not appear to be needed. --- channel_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/channel_test.go b/channel_test.go index e0de5514..419a14c8 100644 --- a/channel_test.go +++ b/channel_test.go @@ -9,7 +9,6 @@ import ( "github.com/relab/gorums/tests/mock" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/encoding" ) type mockSrv struct{} @@ -111,9 +110,6 @@ func TestChannelUnsuccessfulConnection(t *testing.T) { } func TestChannelReconnection(t *testing.T) { - if encoding.GetCodec(ContentSubtype) == nil { - encoding.RegisterCodec(NewCodec()) - } srvAddr := "127.0.0.1:5000" // wait to start the server startServer, stopServer := testServerSetup(t, srvAddr, dummySrv()) From b10575b0730e27e1c716bb802cda7a11ea597e54 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 26 Mar 2024 20:22:41 +0100 Subject: [PATCH 18/18] chore: removed arguments to mockSrv.Test --- channel_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/channel_test.go b/channel_test.go index 419a14c8..8919ca0d 100644 --- a/channel_test.go +++ b/channel_test.go @@ -13,7 +13,7 @@ import ( type mockSrv struct{} -func (mockSrv) Test(ctx ServerCtx, _ *mock.Request) (resp *mock.Response, err error) { +func (mockSrv) Test(_ ServerCtx, _ *mock.Request) (*mock.Response, error) { return nil, nil }