From 5e65a9e51c876c5fc8c3cbc184c2abf52a23f07b Mon Sep 17 00:00:00 2001 From: Hanish Gogada <2924108@FVFG70VQQ05N.local> Date: Sun, 27 Feb 2022 14:58:37 +0100 Subject: [PATCH 01/18] Configuration creation with configurable replica count and wait time --- tests/retry/alltoallconfig_test.go | 81 +++++++++++ tests/retry/sample.pb.go | 223 +++++++++++++++++++++++++++++ tests/retry/sample.proto | 19 +++ tests/retry/sample_gorums.pb.go | 198 +++++++++++++++++++++++++ 4 files changed, 521 insertions(+) create mode 100644 tests/retry/alltoallconfig_test.go create mode 100644 tests/retry/sample.pb.go create mode 100644 tests/retry/sample.proto create mode 100644 tests/retry/sample_gorums.pb.go diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go new file mode 100644 index 00000000..9e3ce744 --- /dev/null +++ b/tests/retry/alltoallconfig_test.go @@ -0,0 +1,81 @@ +package retry + +import ( + "flag" + "fmt" + "log" + "net" + sync "sync" + "testing" + "time" + + gorums "github.com/relab/gorums" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var replicaCount = flag.Int("replicaCount", 10, "number of replicas to create all-to-all communication") +var waitTime = flag.Int("waitTime", 1, "Seconds to wait before forming the configuration") +var wg sync.WaitGroup + +func TestAlltoAllConfiguration(t *testing.T) { + nodeMap := make(map[string]uint32) + idMap := make(map[uint32]string) + for i := 1; i <= *replicaCount; i++ { + address := fmt.Sprintf("%s:%d", "127.0.0.1", 60000+i) + nodeMap[address] = uint32(i) + idMap[uint32(i)] = address + } + wg.Add(*replicaCount) + for i := 1; i <= *replicaCount; i++ { + go startServerAndCreateConfiguration(nodeMap, idMap[uint32(i)], t, *waitTime) + } + wg.Wait() + log.Println("TestCompleted") +} + +type qspec struct{} + +func (q qspec) WriteQCQF(in *WriteRequest, replies map[uint32]*WriteResponse) (*WriteResponse, bool) { + return &WriteResponse{New: true}, true +} + +type replica struct{} + +func (r replica) WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response *WriteResponse, err error) { + return &WriteResponse{New: true}, nil +} + +func startServerAndCreateConfiguration(nodeMap map[string]uint32, + localAddress string, t *testing.T, waitSeconds int) { + lis, err := net.Listen("tcp", localAddress) + if err != nil { + log.Fatalf("Failed to listen on '%s': %v\n", localAddress, err) + t.Fail() + } + replica := replica{} + srv := gorums.NewServer() + RegisterSampleServer(srv, replica) + go func() { + err := srv.Serve(lis) + if err != nil { + log.Printf("Node %s failed to serve\n", localAddress) + t.Fail() + } + }() + time.Sleep(time.Duration(waitSeconds) * time.Second) + mgr := NewManager(gorums.WithDialTimeout(1*time.Second), + gorums.WithGrpcDialOptions( + grpc.WithBlock(), // block until connections are made + grpc.WithTransportCredentials(insecure.NewCredentials()), // disable TLS + ), + ) + qspec := qspec{} + _, err = mgr.NewConfiguration(qspec, gorums.WithNodeMap(nodeMap)) + if err != nil { + log.Printf("unable to create the configuration Error: %v\n", err) + t.Fail() + } + log.Printf("Configuration formed") + wg.Done() +} diff --git a/tests/retry/sample.pb.go b/tests/retry/sample.pb.go new file mode 100644 index 00000000..bdd245d5 --- /dev/null +++ b/tests/retry/sample.pb.go @@ -0,0 +1,223 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.17.3 +// source: sample.proto + +package retry + +import ( + _ "github.com/relab/gorums" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type WriteResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + New bool `protobuf:"varint,1,opt,name=New,proto3" json:"New,omitempty"` +} + +func (x *WriteResponse) Reset() { + *x = WriteResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_sample_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WriteResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteResponse) ProtoMessage() {} + +func (x *WriteResponse) ProtoReflect() protoreflect.Message { + mi := &file_sample_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteResponse.ProtoReflect.Descriptor instead. +func (*WriteResponse) Descriptor() ([]byte, []int) { + return file_sample_proto_rawDescGZIP(), []int{0} +} + +func (x *WriteResponse) GetNew() bool { + if x != nil { + return x.New + } + return false +} + +type WriteRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=Value,proto3" json:"Value,omitempty"` +} + +func (x *WriteRequest) Reset() { + *x = WriteRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_sample_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WriteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteRequest) ProtoMessage() {} + +func (x *WriteRequest) ProtoReflect() protoreflect.Message { + mi := &file_sample_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteRequest.ProtoReflect.Descriptor instead. +func (*WriteRequest) Descriptor() ([]byte, []int) { + return file_sample_proto_rawDescGZIP(), []int{1} +} + +func (x *WriteRequest) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *WriteRequest) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +var File_sample_proto protoreflect.FileDescriptor + +var file_sample_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, + 0x72, 0x65, 0x74, 0x72, 0x79, 0x1a, 0x0c, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x21, 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x4e, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x03, 0x4e, 0x65, 0x77, 0x22, 0x36, 0x0a, 0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x44, + 0x0a, 0x06, 0x53, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x57, 0x72, 0x69, 0x74, + 0x65, 0x51, 0x43, 0x12, 0x13, 0x2e, 0x72, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x57, 0x72, 0x69, 0x74, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x72, 0x65, 0x74, 0x72, 0x79, + 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x04, + 0xa0, 0xb5, 0x18, 0x01, 0x42, 0x25, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x62, 0x2f, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2f, + 0x74, 0x65, 0x73, 0x74, 0x73, 0x2f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_sample_proto_rawDescOnce sync.Once + file_sample_proto_rawDescData = file_sample_proto_rawDesc +) + +func file_sample_proto_rawDescGZIP() []byte { + file_sample_proto_rawDescOnce.Do(func() { + file_sample_proto_rawDescData = protoimpl.X.CompressGZIP(file_sample_proto_rawDescData) + }) + return file_sample_proto_rawDescData +} + +var file_sample_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_sample_proto_goTypes = []interface{}{ + (*WriteResponse)(nil), // 0: retry.WriteResponse + (*WriteRequest)(nil), // 1: retry.WriteRequest +} +var file_sample_proto_depIdxs = []int32{ + 1, // 0: retry.Sample.WriteQC:input_type -> retry.WriteRequest + 0, // 1: retry.Sample.WriteQC:output_type -> retry.WriteResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_sample_proto_init() } +func file_sample_proto_init() { + if File_sample_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_sample_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WriteResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sample_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WriteRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_sample_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_sample_proto_goTypes, + DependencyIndexes: file_sample_proto_depIdxs, + MessageInfos: file_sample_proto_msgTypes, + }.Build() + File_sample_proto = out.File + file_sample_proto_rawDesc = nil + file_sample_proto_goTypes = nil + file_sample_proto_depIdxs = nil +} diff --git a/tests/retry/sample.proto b/tests/retry/sample.proto new file mode 100644 index 00000000..098ea893 --- /dev/null +++ b/tests/retry/sample.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package retry; + +option go_package = "github.com/relab/gorums/tests/retry"; + +import "gorums.proto"; + +service Sample { + rpc WriteQC(WriteRequest) returns (WriteResponse) { + option (gorums.quorumcall) = true; + } +} + +message WriteResponse { bool New = 1; } +message WriteRequest { + string Key = 1; + string Value = 2; +} \ No newline at end of file diff --git a/tests/retry/sample_gorums.pb.go b/tests/retry/sample_gorums.pb.go new file mode 100644 index 00000000..e9983994 --- /dev/null +++ b/tests/retry/sample_gorums.pb.go @@ -0,0 +1,198 @@ +// Code generated by protoc-gen-gorums. DO NOT EDIT. +// versions: +// protoc-gen-gorums v0.7.0-devel +// protoc v3.17.3 +// source: sample.proto + +package retry + +import ( + context "context" + fmt "fmt" + gorums "github.com/relab/gorums" + encoding "google.golang.org/grpc/encoding" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = gorums.EnforceVersion(7 - gorums.MinVersion) + // Verify that the gorums runtime is sufficiently up-to-date. + _ = gorums.EnforceVersion(gorums.MaxVersion - 7) +) + +// A Configuration represents a static set of nodes on which quorum remote +// procedure calls may be invoked. +type Configuration struct { + gorums.RawConfiguration + nodes []*Node + qspec QuorumSpec +} + +// ConfigurationFromRaw returns a new Configuration from the given raw configuration and QuorumSpec. +// +// This function may for example be used to "clone" a configuration but install a different QuorumSpec: +// cfg1, err := mgr.NewConfiguration(qspec1, opts...) +// cfg2 := ConfigurationFromRaw(cfg1.RawConfig, qspec2) +func ConfigurationFromRaw(rawCfg gorums.RawConfiguration, qspec QuorumSpec) *Configuration { + // return an error if the QuorumSpec interface is not empty and no implementation was provided. + var test interface{} = struct{}{} + if _, empty := test.(QuorumSpec); !empty && qspec == nil { + panic("QuorumSpec may not be nil") + } + return &Configuration{ + RawConfiguration: rawCfg, + qspec: qspec, + } +} + +// Nodes returns a slice of each available node. IDs are returned in the same +// order as they were provided in the creation of the Manager. +// +// NOTE: mutating the returned slice is not supported. +func (c *Configuration) Nodes() []*Node { + if c.nodes == nil { + c.nodes = make([]*Node, 0, c.Size()) + for _, n := range c.RawConfiguration { + c.nodes = append(c.nodes, &Node{n}) + } + } + return c.nodes +} + +// And returns a NodeListOption that can be used to create a new configuration combining c and d. +func (c Configuration) And(d *Configuration) gorums.NodeListOption { + return c.RawConfiguration.And(d.RawConfiguration) +} + +// Except returns a NodeListOption that can be used to create a new configuration +// from c without the nodes in rm. +func (c Configuration) Except(rm *Configuration) gorums.NodeListOption { + return c.RawConfiguration.Except(rm.RawConfiguration) +} + +func init() { + if encoding.GetCodec(gorums.ContentSubtype) == nil { + encoding.RegisterCodec(gorums.NewCodec()) + } +} + +// Manager maintains a connection pool of nodes on +// which quorum calls can be performed. +type Manager struct { + *gorums.RawManager +} + +// NewManager returns a new Manager for managing connection to nodes added +// to the manager. This function accepts manager options used to configure +// various aspects of the manager. +func NewManager(opts ...gorums.ManagerOption) (mgr *Manager) { + mgr = &Manager{} + mgr.RawManager = gorums.NewRawManager(opts...) + return mgr +} + +// NewConfiguration returns a configuration based on the provided list of nodes (required) +// and an optional quorum specification. The QuorumSpec is necessary for call types that +// must process replies. For configurations only used for unicast or multicast call types, +// a QuorumSpec is not needed. The QuorumSpec interface is also a ConfigOption. +// Nodes can be supplied using WithNodeMap or WithNodeList, or WithNodeIDs. +// A new configuration can also be created from an existing configuration, +// using the And, WithNewNodes, Except, and WithoutNodes methods. +func (m *Manager) NewConfiguration(opts ...gorums.ConfigOption) (c *Configuration, err error) { + if len(opts) < 1 || len(opts) > 2 { + return nil, fmt.Errorf("wrong number of options: %d", len(opts)) + } + c = &Configuration{} + for _, opt := range opts { + switch v := opt.(type) { + case gorums.NodeListOption: + c.RawConfiguration, err = gorums.NewRawConfiguration(m.RawManager, v) + if err != nil { + return nil, err + } + case QuorumSpec: + // Must be last since v may match QuorumSpec if it is interface{} + c.qspec = v + default: + return nil, fmt.Errorf("unknown option type: %v", v) + } + } + // return an error if the QuorumSpec interface is not empty and no implementation was provided. + var test interface{} = struct{}{} + if _, empty := test.(QuorumSpec); !empty && c.qspec == nil { + return nil, fmt.Errorf("missing required QuorumSpec") + } + return c, nil +} + +// Nodes returns a slice of available nodes on this manager. +// IDs are returned in the order they were added at creation of the manager. +func (m *Manager) Nodes() []*Node { + gorumsNodes := m.RawManager.Nodes() + nodes := make([]*Node, 0, len(gorumsNodes)) + for _, n := range gorumsNodes { + nodes = append(nodes, &Node{n}) + } + return nodes +} + +// Node encapsulates the state of a node on which a remote procedure call +// can be performed. +type Node struct { + *gorums.RawNode +} + +// QuorumSpec is the interface of quorum functions for Sample. +type QuorumSpec interface { + gorums.ConfigOption + + // WriteQCQF is the quorum function for the WriteQC + // quorum call method. The in parameter is the request object + // supplied to the WriteQC method at call time, and may or may not + // be used by the quorum function. If the in parameter is not needed + // you should implement your quorum function with '_ *WriteRequest'. + WriteQCQF(in *WriteRequest, replies map[uint32]*WriteResponse) (*WriteResponse, bool) +} + +// WriteQC is a quorum call invoked on all nodes in configuration c, +// with the same argument in, and returns a combined result. +func (c *Configuration) WriteQC(ctx context.Context, in *WriteRequest) (resp *WriteResponse, err error) { + cd := gorums.QuorumCallData{ + Message: in, + Method: "retry.Sample.WriteQC", + } + cd.QuorumFunction = func(req protoreflect.ProtoMessage, replies map[uint32]protoreflect.ProtoMessage) (protoreflect.ProtoMessage, bool) { + r := make(map[uint32]*WriteResponse, len(replies)) + for k, v := range replies { + r[k] = v.(*WriteResponse) + } + return c.qspec.WriteQCQF(req.(*WriteRequest), r) + } + + res, err := c.RawConfiguration.QuorumCall(ctx, cd) + if err != nil { + return nil, err + } + return res.(*WriteResponse), err +} + +// Sample is the server-side API for the Sample Service +type Sample interface { + WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response *WriteResponse, err error) +} + +func RegisterSampleServer(srv *gorums.Server, impl Sample) { + srv.RegisterHandler("retry.Sample.WriteQC", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { + req := in.Message.(*WriteRequest) + defer ctx.Release() + resp, err := impl.WriteQC(ctx, req) + gorums.SendMessage(ctx, finished, gorums.WrapMessage(in.Metadata, resp, err)) + }) +} + +type internalWriteResponse struct { + nid uint32 + reply *WriteResponse + err error +} From 529245801efc5e6746d4cc37a96afa906d00a7b2 Mon Sep 17 00:00:00 2001 From: Hanish Gogada <2924108@FVFG70VQQ05N.local> Date: Sun, 27 Feb 2022 18:11:46 +0100 Subject: [PATCH 02/18] Modified and added one more style of configuration creation --- tests/retry/alltoallconfig_test.go | 102 ++++++++++++++++++++++------- 1 file changed, 78 insertions(+), 24 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index 9e3ce744..00d18ddb 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -14,24 +14,61 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var replicaCount = flag.Int("replicaCount", 10, "number of replicas to create all-to-all communication") -var waitTime = flag.Int("waitTime", 1, "Seconds to wait before forming the configuration") +var replicaCount = flag.Int("replicaCount", 80, "number of replicas to create all-to-all communication") +var waitTime = flag.Int("waitTime", 2, "Seconds to wait before forming the configuration") var wg sync.WaitGroup -func TestAlltoAllConfiguration(t *testing.T) { +func TestAlltoAllConfigurationStyle1(t *testing.T) { nodeMap := make(map[string]uint32) - idMap := make(map[uint32]string) + replicaList := make([]*replica, 0) for i := 1; i <= *replicaCount; i++ { - address := fmt.Sprintf("%s:%d", "127.0.0.1", 60000+i) + address := fmt.Sprintf("%s:%d", "127.0.0.1", 50000+i) nodeMap[address] = uint32(i) - idMap[uint32(i)] = address + replica := replica{ + address: address, + id: uint32(i), + } + replicaList = append(replicaList, &replica) } wg.Add(*replicaCount) - for i := 1; i <= *replicaCount; i++ { - go startServerAndCreateConfiguration(nodeMap, idMap[uint32(i)], t, *waitTime) + for _, replica := range replicaList { + go replica.startServerAndCreateConfig(nodeMap, t) + } + wg.Wait() + log.Println("TestAlltoAllConfigurationStyle1 test completed") +} + +func TestAlltoAllConfigurationStyle2(t *testing.T) { + replicas := createReplicas() + nodeMap := make(map[string]uint32) + + for _, replica := range replicas { + nodeMap[replica.address] = replica.id + } + wg.Add(*replicaCount) + for _, replica := range replicas { + go replica.createConfiguration(nodeMap, t) } wg.Wait() - log.Println("TestCompleted") + log.Println("TestAlltoAllConfigurationStyle2 test completed") + +} + +func createReplicas() []*replica { + replicas := make([]*replica, 0) + for i := 1; i <= *replicaCount; i++ { + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil + } + replica := replica{ + address: lis.Addr().String(), + id: uint32(i), + lis: lis, + } + replicas = append(replicas, &replica) + } + return replicas } type qspec struct{} @@ -40,30 +77,29 @@ func (q qspec) WriteQCQF(in *WriteRequest, replies map[uint32]*WriteResponse) (* return &WriteResponse{New: true}, true } -type replica struct{} +type replica struct { + address string + lis net.Listener + id uint32 + server *gorums.Server +} func (r replica) WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response *WriteResponse, err error) { return &WriteResponse{New: true}, nil } -func startServerAndCreateConfiguration(nodeMap map[string]uint32, - localAddress string, t *testing.T, waitSeconds int) { - lis, err := net.Listen("tcp", localAddress) - if err != nil { - log.Fatalf("Failed to listen on '%s': %v\n", localAddress, err) - t.Fail() - } - replica := replica{} +func (r *replica) createConfiguration(nodeMap map[string]uint32, t *testing.T) { srv := gorums.NewServer() - RegisterSampleServer(srv, replica) + r.server = srv + RegisterSampleServer(srv, r) go func() { - err := srv.Serve(lis) + err := srv.Serve(r.lis) if err != nil { - log.Printf("Node %s failed to serve\n", localAddress) + log.Printf("Node %s failed to serve\n", r.address) t.Fail() } }() - time.Sleep(time.Duration(waitSeconds) * time.Second) + time.Sleep(time.Duration(*waitTime) * time.Second) mgr := NewManager(gorums.WithDialTimeout(1*time.Second), gorums.WithGrpcDialOptions( grpc.WithBlock(), // block until connections are made @@ -71,11 +107,29 @@ func startServerAndCreateConfiguration(nodeMap map[string]uint32, ), ) qspec := qspec{} - _, err = mgr.NewConfiguration(qspec, gorums.WithNodeMap(nodeMap)) + _, err := mgr.NewConfiguration(qspec, gorums.WithNodeMap(nodeMap)) if err != nil { log.Printf("unable to create the configuration Error: %v\n", err) t.Fail() } - log.Printf("Configuration formed") wg.Done() + +} + +func (r *replica) startListener(t *testing.T) { + lis, err := net.Listen("tcp", r.address) + if err != nil { + log.Fatalf("Failed to listen on '%s': %v\n", r.address, err) + t.Fail() + } + r.lis = lis +} + +func (r *replica) startServerAndCreateConfig(nodeMap map[string]uint32, t *testing.T) { + r.startListener(t) + r.createConfiguration(nodeMap, t) +} + +func (r *replica) stopServer() { + r.server.Stop() } From 5ea8e862dc1a29629ae071e6dfdfe69c38669490 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sun, 27 Feb 2022 19:13:48 +0100 Subject: [PATCH 03/18] Mostly minor tweaks --- tests/retry/alltoallconfig_test.go | 33 ++++++++++++++---------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index 00d18ddb..404d3b84 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -5,7 +5,7 @@ import ( "fmt" "log" "net" - sync "sync" + "sync" "testing" "time" @@ -14,11 +14,13 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var replicaCount = flag.Int("replicaCount", 80, "number of replicas to create all-to-all communication") -var waitTime = flag.Int("waitTime", 2, "Seconds to wait before forming the configuration") -var wg sync.WaitGroup +var ( + replicaCount = flag.Int("replicaCount", 80, "number of replicas to create all-to-all communication") + waitTime = flag.Int("waitTime", 2, "Seconds to wait before forming the configuration") + wg sync.WaitGroup +) -func TestAlltoAllConfigurationStyle1(t *testing.T) { +func TestAllToAllConfigurationStyle1(t *testing.T) { nodeMap := make(map[string]uint32) replicaList := make([]*replica, 0) for i := 1; i <= *replicaCount; i++ { @@ -35,13 +37,12 @@ func TestAlltoAllConfigurationStyle1(t *testing.T) { go replica.startServerAndCreateConfig(nodeMap, t) } wg.Wait() - log.Println("TestAlltoAllConfigurationStyle1 test completed") + log.Println("TestAllToAllConfigurationStyle1 test completed") } -func TestAlltoAllConfigurationStyle2(t *testing.T) { +func TestAllToAllConfigurationStyle2(t *testing.T) { replicas := createReplicas() nodeMap := make(map[string]uint32) - for _, replica := range replicas { nodeMap[replica.address] = replica.id } @@ -50,8 +51,7 @@ func TestAlltoAllConfigurationStyle2(t *testing.T) { go replica.createConfiguration(nodeMap, t) } wg.Wait() - log.Println("TestAlltoAllConfigurationStyle2 test completed") - + log.Println("TestAllToAllConfigurationStyle2 test completed") } func createReplicas() []*replica { @@ -93,9 +93,8 @@ func (r *replica) createConfiguration(nodeMap map[string]uint32, t *testing.T) { r.server = srv RegisterSampleServer(srv, r) go func() { - err := srv.Serve(r.lis) - if err != nil { - log.Printf("Node %s failed to serve\n", r.address) + if err := srv.Serve(r.lis); err != nil { + log.Printf("Node %s failed to serve: %v\n", r.address, err) t.Fail() } }() @@ -109,18 +108,16 @@ func (r *replica) createConfiguration(nodeMap map[string]uint32, t *testing.T) { qspec := qspec{} _, err := mgr.NewConfiguration(qspec, gorums.WithNodeMap(nodeMap)) if err != nil { - log.Printf("unable to create the configuration Error: %v\n", err) - t.Fail() + t.Fatalf("Failed to create the configuration: %v", err) } wg.Done() - } func (r *replica) startListener(t *testing.T) { + t.Helper() lis, err := net.Listen("tcp", r.address) if err != nil { - log.Fatalf("Failed to listen on '%s': %v\n", r.address, err) - t.Fail() + t.Fatalf("Failed to listen on '%s': %v", r.address, err) } r.lis = lis } From 887ec3699428e1d04a1be6e1ec00ffa3f5524df9 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 1 Mar 2022 17:23:12 +0100 Subject: [PATCH 04/18] Replaced waitgroup with errgroup and errChan This fixes various problems with error reporting when running tests with goroutines. Unfortunately, it still seems to fail for Style1 for any replica count, but Style2 does not fail even for replica counts up to 180. --- tests/retry/alltoallconfig_test.go | 55 +++++++++++++++++------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index 404d3b84..2a079898 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -3,13 +3,12 @@ package retry import ( "flag" "fmt" - "log" "net" - "sync" "testing" "time" gorums "github.com/relab/gorums" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -17,7 +16,6 @@ import ( var ( replicaCount = flag.Int("replicaCount", 80, "number of replicas to create all-to-all communication") waitTime = flag.Int("waitTime", 2, "Seconds to wait before forming the configuration") - wg sync.WaitGroup ) func TestAllToAllConfigurationStyle1(t *testing.T) { @@ -32,12 +30,21 @@ func TestAllToAllConfigurationStyle1(t *testing.T) { } replicaList = append(replicaList, &replica) } - wg.Add(*replicaCount) + g := new(errgroup.Group) for _, replica := range replicaList { - go replica.startServerAndCreateConfig(nodeMap, t) + replica := replica + g.Go(func() error { + err := replica.startListener() + if err != nil { + return err + } + return replica.createConfiguration(nodeMap) + }) } - wg.Wait() - log.Println("TestAllToAllConfigurationStyle1 test completed") + if err := g.Wait(); err != nil { + t.Fatal(err) + } + t.Log("Successful TestAllToAllConfigurationStyle1 completion") } func TestAllToAllConfigurationStyle2(t *testing.T) { @@ -46,12 +53,15 @@ func TestAllToAllConfigurationStyle2(t *testing.T) { for _, replica := range replicas { nodeMap[replica.address] = replica.id } - wg.Add(*replicaCount) + g := new(errgroup.Group) for _, replica := range replicas { - go replica.createConfiguration(nodeMap, t) + replica := replica + g.Go(func() error { return replica.createConfiguration(nodeMap) }) + } + if err := g.Wait(); err != nil { + t.Fatal(err) } - wg.Wait() - log.Println("TestAllToAllConfigurationStyle2 test completed") + t.Log("Successful TestAllToAllConfigurationStyle2 completion") } func createReplicas() []*replica { @@ -88,14 +98,14 @@ func (r replica) WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response return &WriteResponse{New: true}, nil } -func (r *replica) createConfiguration(nodeMap map[string]uint32, t *testing.T) { +func (r *replica) createConfiguration(nodeMap map[string]uint32) error { srv := gorums.NewServer() r.server = srv RegisterSampleServer(srv, r) + errChan := make(chan error) go func() { if err := srv.Serve(r.lis); err != nil { - log.Printf("Node %s failed to serve: %v\n", r.address, err) - t.Fail() + errChan <- fmt.Errorf("failed to serve at %q: %w", r.address, err) } }() time.Sleep(time.Duration(*waitTime) * time.Second) @@ -108,23 +118,20 @@ func (r *replica) createConfiguration(nodeMap map[string]uint32, t *testing.T) { qspec := qspec{} _, err := mgr.NewConfiguration(qspec, gorums.WithNodeMap(nodeMap)) if err != nil { - t.Fatalf("Failed to create the configuration: %v", err) + return err } - wg.Done() + close(errChan) + // since errChan is closed, this should either return an error or nil + return <-errChan } -func (r *replica) startListener(t *testing.T) { - t.Helper() +func (r *replica) startListener() error { lis, err := net.Listen("tcp", r.address) if err != nil { - t.Fatalf("Failed to listen on '%s': %v", r.address, err) + return fmt.Errorf("failed to listen at %q: %w", r.address, err) } r.lis = lis -} - -func (r *replica) startServerAndCreateConfig(nodeMap map[string]uint32, t *testing.T) { - r.startListener(t) - r.createConfiguration(nodeMap, t) + return nil } func (r *replica) stopServer() { From fdd4ffce1eb5a5851e9b388f3f0333635ef17b90 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 1 Mar 2022 17:47:45 +0100 Subject: [PATCH 05/18] Fixed error handling in createReplicas --- tests/retry/alltoallconfig_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index 2a079898..a84b6a62 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - gorums "github.com/relab/gorums" + "github.com/relab/gorums" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -48,7 +48,10 @@ func TestAllToAllConfigurationStyle1(t *testing.T) { } func TestAllToAllConfigurationStyle2(t *testing.T) { - replicas := createReplicas() + replicas, err := createReplicas() + if err != nil { + t.Fatal(err) + } nodeMap := make(map[string]uint32) for _, replica := range replicas { nodeMap[replica.address] = replica.id @@ -64,12 +67,12 @@ func TestAllToAllConfigurationStyle2(t *testing.T) { t.Log("Successful TestAllToAllConfigurationStyle2 completion") } -func createReplicas() []*replica { +func createReplicas() ([]*replica, error) { replicas := make([]*replica, 0) for i := 1; i <= *replicaCount; i++ { lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { - return nil + return nil, err } replica := replica{ address: lis.Addr().String(), @@ -78,7 +81,7 @@ func createReplicas() []*replica { } replicas = append(replicas, &replica) } - return replicas + return replicas, nil } type qspec struct{} From 9f32140f99a7c967a5e2100ca2e7a93471963279 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 1 Mar 2022 18:10:11 +0100 Subject: [PATCH 06/18] Fixed too many open files problem To avoid that the process create to many connections (open files) each test should stop their servers. --- tests/retry/alltoallconfig_test.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index a84b6a62..4cbd375e 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -20,7 +20,7 @@ var ( func TestAllToAllConfigurationStyle1(t *testing.T) { nodeMap := make(map[string]uint32) - replicaList := make([]*replica, 0) + replicas := make([]*replica, 0) for i := 1; i <= *replicaCount; i++ { address := fmt.Sprintf("%s:%d", "127.0.0.1", 50000+i) nodeMap[address] = uint32(i) @@ -28,10 +28,15 @@ func TestAllToAllConfigurationStyle1(t *testing.T) { address: address, id: uint32(i), } - replicaList = append(replicaList, &replica) + replicas = append(replicas, &replica) } + defer func() { + for _, replica := range replicas { + replica.stopServer() + } + }() g := new(errgroup.Group) - for _, replica := range replicaList { + for _, replica := range replicas { replica := replica g.Go(func() error { err := replica.startListener() @@ -52,6 +57,11 @@ func TestAllToAllConfigurationStyle2(t *testing.T) { if err != nil { t.Fatal(err) } + defer func() { + for _, replica := range replicas { + replica.stopServer() + } + }() nodeMap := make(map[string]uint32) for _, replica := range replicas { nodeMap[replica.address] = replica.id From d01de1eb7cf6f743c2f7c4d5d7d8d779abb74a61 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Fri, 4 Mar 2022 07:45:45 -0500 Subject: [PATCH 07/18] A few more attempts to fix the number of all-to-all connections This still contains the old versions that don't work, but I've disabled them. Things should be cleaned up now that the new version (seems to) work. Have been able to start 200 servers. --- tests/retry/alltoallconfig_test.go | 92 ++++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 25 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index 4cbd375e..add9b3cd 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -13,12 +13,12 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var ( - replicaCount = flag.Int("replicaCount", 80, "number of replicas to create all-to-all communication") - waitTime = flag.Int("waitTime", 2, "Seconds to wait before forming the configuration") -) +var replicaCount = flag.Int("replicas", 20, "number of replicas to create all-to-all communication") + +// waitTime = flag.Int("wait", 100, "milliseconds to wait before dialing the configuration replicas") +// timeout = flag.Duration("timeout", 100*time.Millisecond, "duration to wait before dialing the configuration replicas") -func TestAllToAllConfigurationStyle1(t *testing.T) { +func disabledTestAllToAllConfigurationStyle1(t *testing.T) { nodeMap := make(map[string]uint32) replicas := make([]*replica, 0) for i := 1; i <= *replicaCount; i++ { @@ -52,7 +52,7 @@ func TestAllToAllConfigurationStyle1(t *testing.T) { t.Log("Successful TestAllToAllConfigurationStyle1 completion") } -func TestAllToAllConfigurationStyle2(t *testing.T) { +func disabledTestAllToAllConfigurationStyle2(t *testing.T) { replicas, err := createReplicas() if err != nil { t.Fatal(err) @@ -77,8 +77,43 @@ func TestAllToAllConfigurationStyle2(t *testing.T) { t.Log("Successful TestAllToAllConfigurationStyle2 completion") } +func TestAllToAllConfigurationStyle3(t *testing.T) { + srvs := make([]*replica, *replicaCount) + for i := range srvs { + srvs[i] = &replica{} + } + addrs, closeServers := gorums.TestSetup(t, *replicaCount, func(i int) gorums.ServerIface { + srv := gorums.NewServer() + RegisterSampleServer(srv, srvs[i]) + return srv + }) + for i := range srvs { + srvs[i].address = addrs[i] + srvs[i].id = uint32(i) + } + nodeMap := make(map[string]uint32) + for _, replica := range srvs { + nodeMap[replica.address] = replica.id + } + for _, replica := range srvs { + replica.createConfiguration(nodeMap) + } + + teardown := func() { + for _, replica := range srvs { + // replica.stopServer() + replica.mgr.Close() + } + closeServers() + } + teardown() + t.Log("Successful TestAllToAllConfigurationStyle3 completion") +} + func createReplicas() ([]*replica, error) { replicas := make([]*replica, 0) + errChan := make(chan error, *replicaCount) + defer close(errChan) for i := 1; i <= *replicaCount; i++ { lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -88,9 +123,27 @@ func createReplicas() ([]*replica, error) { address: lis.Addr().String(), id: uint32(i), lis: lis, + server: gorums.NewServer(), } + RegisterSampleServer(replica.server, replica) replicas = append(replicas, &replica) } + + for _, replica := range replicas { + replica := replica + go func() { + if err := replica.serve(); err != nil { + errChan <- fmt.Errorf("failed to serve at %q: %w", replica.address, err) + } + }() + } + + select { + case err := <-errChan: + return nil, err + case <-time.After(1000 * time.Millisecond): + // slept for a bit to allow replica serve goroutines to fail + } return replicas, nil } @@ -105,6 +158,7 @@ type replica struct { lis net.Listener id uint32 server *gorums.Server + mgr *Manager } func (r replica) WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response *WriteResponse, err error) { @@ -112,30 +166,14 @@ func (r replica) WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response } func (r *replica) createConfiguration(nodeMap map[string]uint32) error { - srv := gorums.NewServer() - r.server = srv - RegisterSampleServer(srv, r) - errChan := make(chan error) - go func() { - if err := srv.Serve(r.lis); err != nil { - errChan <- fmt.Errorf("failed to serve at %q: %w", r.address, err) - } - }() - time.Sleep(time.Duration(*waitTime) * time.Second) - mgr := NewManager(gorums.WithDialTimeout(1*time.Second), + r.mgr = NewManager(gorums.WithDialTimeout(100*time.Millisecond), gorums.WithGrpcDialOptions( grpc.WithBlock(), // block until connections are made grpc.WithTransportCredentials(insecure.NewCredentials()), // disable TLS ), ) - qspec := qspec{} - _, err := mgr.NewConfiguration(qspec, gorums.WithNodeMap(nodeMap)) - if err != nil { - return err - } - close(errChan) - // since errChan is closed, this should either return an error or nil - return <-errChan + _, err := r.mgr.NewConfiguration(qspec{}, gorums.WithNodeMap(nodeMap)) + return err } func (r *replica) startListener() error { @@ -147,6 +185,10 @@ func (r *replica) startListener() error { return nil } +func (r *replica) serve() error { + return r.server.Serve(r.lis) +} + func (r *replica) stopServer() { r.server.Stop() } From 936e44c62c6921ecd69e6b747b1dc29572a2208e Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Mon, 7 Mar 2022 13:06:31 -0500 Subject: [PATCH 08/18] Fixed problem with style2 --- tests/retry/alltoallconfig_test.go | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index add9b3cd..2e94fed6 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -52,7 +52,7 @@ func disabledTestAllToAllConfigurationStyle1(t *testing.T) { t.Log("Successful TestAllToAllConfigurationStyle1 completion") } -func disabledTestAllToAllConfigurationStyle2(t *testing.T) { +func TestAllToAllConfigurationStyle2(t *testing.T) { replicas, err := createReplicas() if err != nil { t.Fatal(err) @@ -66,15 +66,9 @@ func disabledTestAllToAllConfigurationStyle2(t *testing.T) { for _, replica := range replicas { nodeMap[replica.address] = replica.id } - g := new(errgroup.Group) for _, replica := range replicas { - replica := replica - g.Go(func() error { return replica.createConfiguration(nodeMap) }) - } - if err := g.Wait(); err != nil { - t.Fatal(err) + replica.createConfiguration(nodeMap) } - t.Log("Successful TestAllToAllConfigurationStyle2 completion") } func TestAllToAllConfigurationStyle3(t *testing.T) { @@ -87,12 +81,10 @@ func TestAllToAllConfigurationStyle3(t *testing.T) { RegisterSampleServer(srv, srvs[i]) return srv }) - for i := range srvs { + nodeMap := make(map[string]uint32) + for i, replica := range srvs { srvs[i].address = addrs[i] srvs[i].id = uint32(i) - } - nodeMap := make(map[string]uint32) - for _, replica := range srvs { nodeMap[replica.address] = replica.id } for _, replica := range srvs { @@ -101,13 +93,11 @@ func TestAllToAllConfigurationStyle3(t *testing.T) { teardown := func() { for _, replica := range srvs { - // replica.stopServer() replica.mgr.Close() } closeServers() } teardown() - t.Log("Successful TestAllToAllConfigurationStyle3 completion") } func createReplicas() ([]*replica, error) { @@ -127,10 +117,6 @@ func createReplicas() ([]*replica, error) { } RegisterSampleServer(replica.server, replica) replicas = append(replicas, &replica) - } - - for _, replica := range replicas { - replica := replica go func() { if err := replica.serve(); err != nil { errChan <- fmt.Errorf("failed to serve at %q: %w", replica.address, err) @@ -190,5 +176,6 @@ func (r *replica) serve() error { } func (r *replica) stopServer() { + r.mgr.Close() r.server.Stop() } From c1c56cbd56ff236a59e3d58449dccb2adbae0aab Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Mon, 7 Mar 2022 13:14:52 -0500 Subject: [PATCH 09/18] Removed Style1 due to ordering problems --- tests/retry/alltoallconfig_test.go | 38 ------------------------------ 1 file changed, 38 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index 2e94fed6..bf4a6a2a 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -8,50 +8,12 @@ import ( "time" "github.com/relab/gorums" - "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) var replicaCount = flag.Int("replicas", 20, "number of replicas to create all-to-all communication") -// waitTime = flag.Int("wait", 100, "milliseconds to wait before dialing the configuration replicas") -// timeout = flag.Duration("timeout", 100*time.Millisecond, "duration to wait before dialing the configuration replicas") - -func disabledTestAllToAllConfigurationStyle1(t *testing.T) { - nodeMap := make(map[string]uint32) - replicas := make([]*replica, 0) - for i := 1; i <= *replicaCount; i++ { - address := fmt.Sprintf("%s:%d", "127.0.0.1", 50000+i) - nodeMap[address] = uint32(i) - replica := replica{ - address: address, - id: uint32(i), - } - replicas = append(replicas, &replica) - } - defer func() { - for _, replica := range replicas { - replica.stopServer() - } - }() - g := new(errgroup.Group) - for _, replica := range replicas { - replica := replica - g.Go(func() error { - err := replica.startListener() - if err != nil { - return err - } - return replica.createConfiguration(nodeMap) - }) - } - if err := g.Wait(); err != nil { - t.Fatal(err) - } - t.Log("Successful TestAllToAllConfigurationStyle1 completion") -} - func TestAllToAllConfigurationStyle2(t *testing.T) { replicas, err := createReplicas() if err != nil { From 07ed907d449e961dde6da88136cc41cf8474a161 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Mon, 7 Mar 2022 13:18:21 -0500 Subject: [PATCH 10/18] Removed unused startListener() function --- tests/retry/alltoallconfig_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index bf4a6a2a..95ef5216 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -124,15 +124,6 @@ func (r *replica) createConfiguration(nodeMap map[string]uint32) error { return err } -func (r *replica) startListener() error { - lis, err := net.Listen("tcp", r.address) - if err != nil { - return fmt.Errorf("failed to listen at %q: %w", r.address, err) - } - r.lis = lis - return nil -} - func (r *replica) serve() error { return r.server.Serve(r.lis) } From 400d0efca33d8688dc47175cf5af68db6aa60ca1 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Mon, 7 Mar 2022 13:58:05 -0500 Subject: [PATCH 11/18] Fixed bug in error handling Previously there was no way to know whether or not a goroutine had in fact been started; henceforth, the errChan would be closed by the defer since the createReplicas() call would reach the return before all goroutines had the chance to send their err, causing a panic. In this new version, we wait for all goroutines to start before we check for errors on the channel. --- tests/retry/alltoallconfig_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index 95ef5216..6db161d3 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -65,6 +65,8 @@ func TestAllToAllConfigurationStyle3(t *testing.T) { func createReplicas() ([]*replica, error) { replicas := make([]*replica, 0) errChan := make(chan error, *replicaCount) + startedChan := make(chan struct{}, *replicaCount) + startedCnt := 0 defer close(errChan) for i := 1; i <= *replicaCount; i++ { lis, err := net.Listen("tcp", "127.0.0.1:0") @@ -80,11 +82,17 @@ func createReplicas() ([]*replica, error) { RegisterSampleServer(replica.server, replica) replicas = append(replicas, &replica) go func() { + startedChan <- struct{}{} if err := replica.serve(); err != nil { errChan <- fmt.Errorf("failed to serve at %q: %w", replica.address, err) } }() } + for startedCnt < *replicaCount { + <-startedChan + startedCnt++ + } + close(startedChan) select { case err := <-errChan: From 2e934dd9e8750247b159785e19cf76864b6faeef Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Mon, 7 Mar 2022 14:13:13 -0500 Subject: [PATCH 12/18] Various cleanup and documentation --- tests/retry/alltoallconfig_test.go | 52 ++++++++---------------------- 1 file changed, 14 insertions(+), 38 deletions(-) diff --git a/tests/retry/alltoallconfig_test.go b/tests/retry/alltoallconfig_test.go index 6db161d3..37d05347 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/retry/alltoallconfig_test.go @@ -14,8 +14,8 @@ import ( var replicaCount = flag.Int("replicas", 20, "number of replicas to create all-to-all communication") -func TestAllToAllConfigurationStyle2(t *testing.T) { - replicas, err := createReplicas() +func TestAllToAllConfiguration(t *testing.T) { + replicas, err := createReplicas(*replicaCount) if err != nil { t.Fatal(err) } @@ -33,42 +33,16 @@ func TestAllToAllConfigurationStyle2(t *testing.T) { } } -func TestAllToAllConfigurationStyle3(t *testing.T) { - srvs := make([]*replica, *replicaCount) - for i := range srvs { - srvs[i] = &replica{} - } - addrs, closeServers := gorums.TestSetup(t, *replicaCount, func(i int) gorums.ServerIface { - srv := gorums.NewServer() - RegisterSampleServer(srv, srvs[i]) - return srv - }) - nodeMap := make(map[string]uint32) - for i, replica := range srvs { - srvs[i].address = addrs[i] - srvs[i].id = uint32(i) - nodeMap[replica.address] = replica.id - } - for _, replica := range srvs { - replica.createConfiguration(nodeMap) - } - - teardown := func() { - for _, replica := range srvs { - replica.mgr.Close() - } - closeServers() - } - teardown() -} - -func createReplicas() ([]*replica, error) { +// createReplicas returns a slice of replicas. +// The function waits for all serve goroutines to start and one additional +// second to allow the servers to start before returning. +func createReplicas(numReplicas int) ([]*replica, error) { replicas := make([]*replica, 0) - errChan := make(chan error, *replicaCount) - startedChan := make(chan struct{}, *replicaCount) + errChan := make(chan error, numReplicas) + startedChan := make(chan struct{}, numReplicas) startedCnt := 0 defer close(errChan) - for i := 1; i <= *replicaCount; i++ { + for i := 1; i <= numReplicas; i++ { lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return nil, err @@ -88,7 +62,7 @@ func createReplicas() ([]*replica, error) { } }() } - for startedCnt < *replicaCount { + for startedCnt < numReplicas { <-startedChan startedCnt++ } @@ -113,14 +87,16 @@ type replica struct { address string lis net.Listener id uint32 - server *gorums.Server - mgr *Manager + server *gorums.Server // the replica's gRPC server + mgr *Manager // the replica's Gorums manager (used as a client) } func (r replica) WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response *WriteResponse, err error) { return &WriteResponse{New: true}, nil } +// createConfiguration creates a configuration for the replica, allowing +// this replica to communicate with the other replicas in the configuration. func (r *replica) createConfiguration(nodeMap map[string]uint32) error { r.mgr = NewManager(gorums.WithDialTimeout(100*time.Millisecond), gorums.WithGrpcDialOptions( From 0c962f307169b87da48e71a011cb5409a555a995 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Mon, 7 Mar 2022 14:27:25 -0500 Subject: [PATCH 13/18] Renamed to retry test to all2all --- tests/Makefile | 6 +- .../sample.pb.go => all2all/all2all.pb.go} | 109 +++++++++--------- .../sample.proto => all2all/all2all.proto} | 12 +- .../all2all_gorums.pb.go} | 10 +- .../{retry => all2all}/alltoallconfig_test.go | 2 +- 5 files changed, 72 insertions(+), 67 deletions(-) rename tests/{retry/sample.pb.go => all2all/all2all.pb.go} (50%) rename tests/{retry/sample.proto => all2all/all2all.proto} (59%) rename tests/{retry/sample_gorums.pb.go => all2all/all2all_gorums.pb.go} (96%) rename tests/{retry => all2all}/alltoallconfig_test.go (99%) diff --git a/tests/Makefile b/tests/Makefile index 5b8f51ca..64b8ae23 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,7 +1,7 @@ # Tests that should be run each time -RUNTESTS := qf ordering metadata tls unresponsive dummy oneway config correctable +RUNTESTS := qf ordering metadata tls unresponsive dummy oneway config correctable all2all -.PHONY: all qf ordering metadata tls unresponsive dummy oneway config correctable +.PHONY: all qf ordering metadata tls unresponsive dummy oneway config correctable all2all all: $(RUNTESTS) @@ -23,6 +23,8 @@ config: config/config.pb.go config/config_gorums.pb.go correctable: correctable/correctable.pb.go correctable/correctable_gorums.pb.go +all2all: all2all/all2all.pb.go all2all/all2all_gorums.pb.go + %.pb.go : %.proto @protoc -I=..:. --go_out=paths=source_relative:. $< diff --git a/tests/retry/sample.pb.go b/tests/all2all/all2all.pb.go similarity index 50% rename from tests/retry/sample.pb.go rename to tests/all2all/all2all.pb.go index bdd245d5..2cafba4b 100644 --- a/tests/retry/sample.pb.go +++ b/tests/all2all/all2all.pb.go @@ -1,10 +1,10 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.17.3 -// source: sample.proto +// protoc v3.19.4 +// source: all2all/all2all.proto -package retry +package all2all import ( _ "github.com/relab/gorums" @@ -32,7 +32,7 @@ type WriteResponse struct { func (x *WriteResponse) Reset() { *x = WriteResponse{} if protoimpl.UnsafeEnabled { - mi := &file_sample_proto_msgTypes[0] + mi := &file_all2all_all2all_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -45,7 +45,7 @@ func (x *WriteResponse) String() string { func (*WriteResponse) ProtoMessage() {} func (x *WriteResponse) ProtoReflect() protoreflect.Message { - mi := &file_sample_proto_msgTypes[0] + mi := &file_all2all_all2all_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -58,7 +58,7 @@ func (x *WriteResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use WriteResponse.ProtoReflect.Descriptor instead. func (*WriteResponse) Descriptor() ([]byte, []int) { - return file_sample_proto_rawDescGZIP(), []int{0} + return file_all2all_all2all_proto_rawDescGZIP(), []int{0} } func (x *WriteResponse) GetNew() bool { @@ -80,7 +80,7 @@ type WriteRequest struct { func (x *WriteRequest) Reset() { *x = WriteRequest{} if protoimpl.UnsafeEnabled { - mi := &file_sample_proto_msgTypes[1] + mi := &file_all2all_all2all_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -93,7 +93,7 @@ func (x *WriteRequest) String() string { func (*WriteRequest) ProtoMessage() {} func (x *WriteRequest) ProtoReflect() protoreflect.Message { - mi := &file_sample_proto_msgTypes[1] + mi := &file_all2all_all2all_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -106,7 +106,7 @@ func (x *WriteRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use WriteRequest.ProtoReflect.Descriptor instead. func (*WriteRequest) Descriptor() ([]byte, []int) { - return file_sample_proto_rawDescGZIP(), []int{1} + return file_all2all_all2all_proto_rawDescGZIP(), []int{1} } func (x *WriteRequest) GetKey() string { @@ -123,47 +123,48 @@ func (x *WriteRequest) GetValue() string { return "" } -var File_sample_proto protoreflect.FileDescriptor - -var file_sample_proto_rawDesc = []byte{ - 0x0a, 0x0c, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, - 0x72, 0x65, 0x74, 0x72, 0x79, 0x1a, 0x0c, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x22, 0x21, 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x4e, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x03, 0x4e, 0x65, 0x77, 0x22, 0x36, 0x0a, 0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x44, - 0x0a, 0x06, 0x53, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x57, 0x72, 0x69, 0x74, - 0x65, 0x51, 0x43, 0x12, 0x13, 0x2e, 0x72, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x57, 0x72, 0x69, 0x74, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x72, 0x65, 0x74, 0x72, 0x79, - 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x04, - 0xa0, 0xb5, 0x18, 0x01, 0x42, 0x25, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x62, 0x2f, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2f, - 0x74, 0x65, 0x73, 0x74, 0x73, 0x2f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, +var File_all2all_all2all_proto protoreflect.FileDescriptor + +var file_all2all_all2all_proto_rawDesc = []byte{ + 0x0a, 0x15, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, 0x2f, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, + 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, + 0x1a, 0x0c, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x21, + 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x10, 0x0a, 0x03, 0x4e, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x4e, 0x65, + 0x77, 0x22, 0x36, 0x0a, 0x0c, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x10, 0x0a, 0x03, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x48, 0x0a, 0x06, 0x53, 0x61, 0x6d, + 0x70, 0x6c, 0x65, 0x12, 0x3e, 0x0a, 0x07, 0x57, 0x72, 0x69, 0x74, 0x65, 0x51, 0x43, 0x12, 0x15, + 0x2e, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, 0x2e, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x04, 0xa0, + 0xb5, 0x18, 0x01, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x62, 0x2f, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2f, 0x74, + 0x65, 0x73, 0x74, 0x73, 0x2f, 0x61, 0x6c, 0x6c, 0x32, 0x61, 0x6c, 0x6c, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( - file_sample_proto_rawDescOnce sync.Once - file_sample_proto_rawDescData = file_sample_proto_rawDesc + file_all2all_all2all_proto_rawDescOnce sync.Once + file_all2all_all2all_proto_rawDescData = file_all2all_all2all_proto_rawDesc ) -func file_sample_proto_rawDescGZIP() []byte { - file_sample_proto_rawDescOnce.Do(func() { - file_sample_proto_rawDescData = protoimpl.X.CompressGZIP(file_sample_proto_rawDescData) +func file_all2all_all2all_proto_rawDescGZIP() []byte { + file_all2all_all2all_proto_rawDescOnce.Do(func() { + file_all2all_all2all_proto_rawDescData = protoimpl.X.CompressGZIP(file_all2all_all2all_proto_rawDescData) }) - return file_sample_proto_rawDescData + return file_all2all_all2all_proto_rawDescData } -var file_sample_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_sample_proto_goTypes = []interface{}{ - (*WriteResponse)(nil), // 0: retry.WriteResponse - (*WriteRequest)(nil), // 1: retry.WriteRequest +var file_all2all_all2all_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_all2all_all2all_proto_goTypes = []interface{}{ + (*WriteResponse)(nil), // 0: all2all.WriteResponse + (*WriteRequest)(nil), // 1: all2all.WriteRequest } -var file_sample_proto_depIdxs = []int32{ - 1, // 0: retry.Sample.WriteQC:input_type -> retry.WriteRequest - 0, // 1: retry.Sample.WriteQC:output_type -> retry.WriteResponse +var file_all2all_all2all_proto_depIdxs = []int32{ + 1, // 0: all2all.Sample.WriteQC:input_type -> all2all.WriteRequest + 0, // 1: all2all.Sample.WriteQC:output_type -> all2all.WriteResponse 1, // [1:2] is the sub-list for method output_type 0, // [0:1] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -171,13 +172,13 @@ var file_sample_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for field type_name } -func init() { file_sample_proto_init() } -func file_sample_proto_init() { - if File_sample_proto != nil { +func init() { file_all2all_all2all_proto_init() } +func file_all2all_all2all_proto_init() { + if File_all2all_all2all_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_sample_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_all2all_all2all_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*WriteResponse); i { case 0: return &v.state @@ -189,7 +190,7 @@ func file_sample_proto_init() { return nil } } - file_sample_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_all2all_all2all_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*WriteRequest); i { case 0: return &v.state @@ -206,18 +207,18 @@ func file_sample_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_sample_proto_rawDesc, + RawDescriptor: file_all2all_all2all_proto_rawDesc, NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_sample_proto_goTypes, - DependencyIndexes: file_sample_proto_depIdxs, - MessageInfos: file_sample_proto_msgTypes, + GoTypes: file_all2all_all2all_proto_goTypes, + DependencyIndexes: file_all2all_all2all_proto_depIdxs, + MessageInfos: file_all2all_all2all_proto_msgTypes, }.Build() - File_sample_proto = out.File - file_sample_proto_rawDesc = nil - file_sample_proto_goTypes = nil - file_sample_proto_depIdxs = nil + File_all2all_all2all_proto = out.File + file_all2all_all2all_proto_rawDesc = nil + file_all2all_all2all_proto_goTypes = nil + file_all2all_all2all_proto_depIdxs = nil } diff --git a/tests/retry/sample.proto b/tests/all2all/all2all.proto similarity index 59% rename from tests/retry/sample.proto rename to tests/all2all/all2all.proto index 098ea893..2415c7f0 100644 --- a/tests/retry/sample.proto +++ b/tests/all2all/all2all.proto @@ -1,8 +1,8 @@ syntax = "proto3"; -package retry; +package all2all; -option go_package = "github.com/relab/gorums/tests/retry"; +option go_package = "github.com/relab/gorums/tests/all2all"; import "gorums.proto"; @@ -12,8 +12,10 @@ service Sample { } } -message WriteResponse { bool New = 1; } +message WriteResponse { + bool New = 1; +} message WriteRequest { - string Key = 1; + string Key = 1; string Value = 2; -} \ No newline at end of file +} diff --git a/tests/retry/sample_gorums.pb.go b/tests/all2all/all2all_gorums.pb.go similarity index 96% rename from tests/retry/sample_gorums.pb.go rename to tests/all2all/all2all_gorums.pb.go index e9983994..12f94016 100644 --- a/tests/retry/sample_gorums.pb.go +++ b/tests/all2all/all2all_gorums.pb.go @@ -1,10 +1,10 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.7.0-devel -// protoc v3.17.3 -// source: sample.proto +// protoc v3.19.4 +// source: all2all/all2all.proto -package retry +package all2all import ( context "context" @@ -160,7 +160,7 @@ type QuorumSpec interface { func (c *Configuration) WriteQC(ctx context.Context, in *WriteRequest) (resp *WriteResponse, err error) { cd := gorums.QuorumCallData{ Message: in, - Method: "retry.Sample.WriteQC", + Method: "all2all.Sample.WriteQC", } cd.QuorumFunction = func(req protoreflect.ProtoMessage, replies map[uint32]protoreflect.ProtoMessage) (protoreflect.ProtoMessage, bool) { r := make(map[uint32]*WriteResponse, len(replies)) @@ -183,7 +183,7 @@ type Sample interface { } func RegisterSampleServer(srv *gorums.Server, impl Sample) { - srv.RegisterHandler("retry.Sample.WriteQC", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { + srv.RegisterHandler("all2all.Sample.WriteQC", func(ctx gorums.ServerCtx, in *gorums.Message, finished chan<- *gorums.Message) { req := in.Message.(*WriteRequest) defer ctx.Release() resp, err := impl.WriteQC(ctx, req) diff --git a/tests/retry/alltoallconfig_test.go b/tests/all2all/alltoallconfig_test.go similarity index 99% rename from tests/retry/alltoallconfig_test.go rename to tests/all2all/alltoallconfig_test.go index 37d05347..94604102 100644 --- a/tests/retry/alltoallconfig_test.go +++ b/tests/all2all/alltoallconfig_test.go @@ -1,4 +1,4 @@ -package retry +package all2all import ( "flag" From 920b95e15af14324987cde5d77a19634c12de0b8 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 8 Mar 2022 09:24:54 -0500 Subject: [PATCH 14/18] Removed unnecessary close channel and simplified started loop --- tests/all2all/alltoallconfig_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/all2all/alltoallconfig_test.go b/tests/all2all/alltoallconfig_test.go index 94604102..7701fd3e 100644 --- a/tests/all2all/alltoallconfig_test.go +++ b/tests/all2all/alltoallconfig_test.go @@ -40,8 +40,6 @@ func createReplicas(numReplicas int) ([]*replica, error) { replicas := make([]*replica, 0) errChan := make(chan error, numReplicas) startedChan := make(chan struct{}, numReplicas) - startedCnt := 0 - defer close(errChan) for i := 1; i <= numReplicas; i++ { lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -62,11 +60,9 @@ func createReplicas(numReplicas int) ([]*replica, error) { } }() } - for startedCnt < numReplicas { + for range replicas { <-startedChan - startedCnt++ } - close(startedChan) select { case err := <-errChan: From 202000c1378bc37241bd3b68fc4af1117a5450f2 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 8 Mar 2022 09:26:03 -0500 Subject: [PATCH 15/18] Allocate space for all replicas in advance instead of append --- tests/all2all/alltoallconfig_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/all2all/alltoallconfig_test.go b/tests/all2all/alltoallconfig_test.go index 7701fd3e..a9744521 100644 --- a/tests/all2all/alltoallconfig_test.go +++ b/tests/all2all/alltoallconfig_test.go @@ -37,7 +37,7 @@ func TestAllToAllConfiguration(t *testing.T) { // The function waits for all serve goroutines to start and one additional // second to allow the servers to start before returning. func createReplicas(numReplicas int) ([]*replica, error) { - replicas := make([]*replica, 0) + replicas := make([]*replica, numReplicas) errChan := make(chan error, numReplicas) startedChan := make(chan struct{}, numReplicas) for i := 1; i <= numReplicas; i++ { @@ -52,7 +52,7 @@ func createReplicas(numReplicas int) ([]*replica, error) { server: gorums.NewServer(), } RegisterSampleServer(replica.server, replica) - replicas = append(replicas, &replica) + replicas[i-1] = &replica go func() { startedChan <- struct{}{} if err := replica.serve(); err != nil { From 9dc74b2af2204cd2741f0369b3f2acb9ccca78f9 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 8 Mar 2022 09:26:50 -0500 Subject: [PATCH 16/18] Check err from createConfiguration --- tests/all2all/alltoallconfig_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/all2all/alltoallconfig_test.go b/tests/all2all/alltoallconfig_test.go index a9744521..e09dfc65 100644 --- a/tests/all2all/alltoallconfig_test.go +++ b/tests/all2all/alltoallconfig_test.go @@ -12,7 +12,7 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var replicaCount = flag.Int("replicas", 20, "number of replicas to create all-to-all communication") +var replicaCount = flag.Int("replicas", 10, "number of replicas to create all-to-all communication") func TestAllToAllConfiguration(t *testing.T) { replicas, err := createReplicas(*replicaCount) @@ -29,7 +29,9 @@ func TestAllToAllConfiguration(t *testing.T) { nodeMap[replica.address] = replica.id } for _, replica := range replicas { - replica.createConfiguration(nodeMap) + if err := replica.createConfiguration(nodeMap); err != nil { + t.Error(err) + } } } From 5b5e8c1cd753eb746de08df0286e3eaf20510285 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 8 Mar 2022 09:46:53 -0500 Subject: [PATCH 17/18] Added TestGrpcDial to weed out the problem --- tests/all2all/alltoallconfig_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/all2all/alltoallconfig_test.go b/tests/all2all/alltoallconfig_test.go index e09dfc65..4b58a294 100644 --- a/tests/all2all/alltoallconfig_test.go +++ b/tests/all2all/alltoallconfig_test.go @@ -1,6 +1,7 @@ package all2all import ( + context "context" "flag" "fmt" "net" @@ -87,6 +88,7 @@ type replica struct { id uint32 server *gorums.Server // the replica's gRPC server mgr *Manager // the replica's Gorums manager (used as a client) + conn *grpc.ClientConn } func (r replica) WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response *WriteResponse, err error) { @@ -114,3 +116,21 @@ func (r *replica) stopServer() { r.mgr.Close() r.server.Stop() } + +func TestGrpcDial(t *testing.T) { + replicas, err := createReplicas(*replicaCount) + if err != nil { + t.Fatal(err) + } + for _, replica := range replicas { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000)*time.Millisecond) + defer cancel() + replica.conn, err = grpc.DialContext(ctx, replica.address, + grpc.WithBlock(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + t.Fatal(err) + } + } +} From e8aa996eaf7f2b564da771c8ed61bfa134aef6c1 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Tue, 19 Mar 2024 22:54:07 +0100 Subject: [PATCH 18/18] chore: minor tweaks Just wanted to check if this issue still persists (it does), but thought I just make a few tweaks while I was at it. --- tests/all2all/alltoallconfig_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/all2all/alltoallconfig_test.go b/tests/all2all/alltoallconfig_test.go index 4b58a294..4c2c05a6 100644 --- a/tests/all2all/alltoallconfig_test.go +++ b/tests/all2all/alltoallconfig_test.go @@ -1,7 +1,7 @@ package all2all import ( - context "context" + "context" "flag" "fmt" "net" @@ -43,19 +43,19 @@ func createReplicas(numReplicas int) ([]*replica, error) { replicas := make([]*replica, numReplicas) errChan := make(chan error, numReplicas) startedChan := make(chan struct{}, numReplicas) - for i := 1; i <= numReplicas; i++ { + for i := range replicas { lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return nil, err } - replica := replica{ + replica := &replica{ address: lis.Addr().String(), id: uint32(i), lis: lis, server: gorums.NewServer(), } RegisterSampleServer(replica.server, replica) - replicas[i-1] = &replica + replicas[i] = replica go func() { startedChan <- struct{}{} if err := replica.serve(); err != nil { @@ -100,8 +100,7 @@ func (r replica) WriteQC(ctx gorums.ServerCtx, request *WriteRequest) (response func (r *replica) createConfiguration(nodeMap map[string]uint32) error { r.mgr = NewManager(gorums.WithDialTimeout(100*time.Millisecond), gorums.WithGrpcDialOptions( - grpc.WithBlock(), // block until connections are made - grpc.WithTransportCredentials(insecure.NewCredentials()), // disable TLS + grpc.WithTransportCredentials(insecure.NewCredentials()), ), ) _, err := r.mgr.NewConfiguration(qspec{}, gorums.WithNodeMap(nodeMap)) @@ -123,10 +122,9 @@ func TestGrpcDial(t *testing.T) { t.Fatal(err) } for _, replica := range replicas { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000)*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10000)*time.Millisecond) defer cancel() replica.conn, err = grpc.DialContext(ctx, replica.address, - grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil {