From f0d005c8114bd9062d92d4a98cf9410255b6700e Mon Sep 17 00:00:00 2001 From: lyt122 <2747177214@qq.com> Date: Tue, 6 Aug 2024 16:48:51 +0800 Subject: [PATCH] refactor codes --- controller/registries/consul/config.go | 98 ++++++--------------- controller/registries/consul/config_test.go | 84 ++++++------------ 2 files changed, 53 insertions(+), 129 deletions(-) diff --git a/controller/registries/consul/config.go b/controller/registries/consul/config.go index 27310d31..342b4db5 100644 --- a/controller/registries/consul/config.go +++ b/controller/registries/consul/config.go @@ -22,8 +22,6 @@ import ( "time" consulapi "github.com/hashicorp/consul/api" - "github.com/nacos-group/nacos-sdk-go/model" - istioapi "istio.io/api/networking/v1alpha3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "mosn.io/htnn/controller/pkg/registry" @@ -42,14 +40,15 @@ func init() { name: om.Name, softDeletedServices: map[consulService]bool{}, done: make(chan struct{}), + clientFactory: factory, } return reg, nil }) } var ( - SleepTime = 120 * time.Second - RegistryType = "consul" + //RegistryType = "consul" + factory ClientFactory = &DefaultClientFactory{} ) type consulCatalog interface { @@ -66,10 +65,11 @@ func (c *ConsulAPI) Services(q *consulapi.QueryOptions) (map[string][]string, *c type Consul struct { consul.RegistryType - logger log.RegistryLogger - store registry.ServiceEntryStore - name string - client *Client + logger log.RegistryLogger + store registry.ServiceEntryStore + name string + client *Client + clientFactory ClientFactory lock sync.RWMutex watchingServices map[consulService]bool @@ -88,12 +88,13 @@ type Client struct { Token string } -type consulService struct { - DataCenter string - ServiceName string +type ClientFactory interface { + NewClient(config *consul.Config) (*Client, error) } -func (reg *Consul) NewClient(config *consul.Config) (*Client, error) { +type DefaultClientFactory struct{} + +func (f *DefaultClientFactory) NewClient(config *consul.Config) (*Client, error) { uri, err := url.Parse(config.ServerUrl) if err != nil { return nil, fmt.Errorf("invalid server url: %s", config.ServerUrl) @@ -118,31 +119,26 @@ func (reg *Consul) NewClient(config *consul.Config) (*Client, error) { }, nil } +type consulService struct { + DataCenter string + ServiceName string +} + func (reg *Consul) Start(c registrytype.RegistryConfig) error { config := c.(*consul.Config) - if reg.client == nil { + client, err := reg.clientFactory.NewClient(config) + if err != nil { + return err + } - client, err := reg.NewClient(config) - if err != nil { - return err - } + reg.client = client - reg.client = client - } services, err := reg.fetchAllServices(reg.client) if err != nil { return err } - //for key := range services { - // err = reg.subscribe(key.ServiceName) - // if err != nil { - // reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, key) - // - // delete(services, key) - // } - //} reg.watchingServices = services @@ -163,13 +159,14 @@ func (reg *Consul) Start(c registrytype.RegistryConfig) error { select { case <-reg.done: reg.logger.Infof("stop refreshing services") - //wait to retry - time.Sleep(SleepTime) + return default: } services, meta, err := reg.client.consulCatalog.Services(q) if err != nil { reg.logger.Errorf("failed to get services, err: %v", err) + time.Sleep(dur) + continue } reg.refresh(services) @@ -263,46 +260,3 @@ func (reg *Consul) refresh(services map[string][]string) { } } - -func (reg *Consul) generateServiceEntry(host string, services []model.SubscribeService) *registry.ServiceEntryWrapper { - portList := make([]*istioapi.ServicePort, 0, 1) - endpoints := make([]*istioapi.WorkloadEntry, 0, len(services)) - - for _, service := range services { - protocol := registry.HTTP - if service.Metadata == nil { - service.Metadata = make(map[string]string) - } - - if service.Metadata["protocol"] != "" { - protocol = registry.ParseProtocol(service.Metadata["protocol"]) - } - - port := &istioapi.ServicePort{ - Name: string(protocol), - Number: uint32(service.Port), - Protocol: string(protocol), - } - if len(portList) == 0 { - portList = append(portList, port) - } - - endpoint := istioapi.WorkloadEntry{ - Address: service.Ip, - Ports: map[string]uint32{port.Protocol: port.Number}, - Labels: service.Metadata, - } - endpoints = append(endpoints, &endpoint) - } - - return ®istry.ServiceEntryWrapper{ - ServiceEntry: istioapi.ServiceEntry{ - Hosts: []string{host}, - Ports: portList, - Location: istioapi.ServiceEntry_MESH_INTERNAL, - Resolution: istioapi.ServiceEntry_STATIC, - Endpoints: endpoints, - }, - Source: RegistryType, - } -} diff --git a/controller/registries/consul/config_test.go b/controller/registries/consul/config_test.go index c4cb6ab4..5e954cdc 100644 --- a/controller/registries/consul/config_test.go +++ b/controller/registries/consul/config_test.go @@ -18,25 +18,22 @@ import ( "testing" "github.com/hashicorp/consul/api" - "github.com/nacos-group/nacos-sdk-go/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" - istioapi "istio.io/api/networking/v1alpha3" - "mosn.io/htnn/controller/pkg/registry" "mosn.io/htnn/controller/pkg/registry/log" "mosn.io/htnn/types/registries/consul" ) func TestNewClient(t *testing.T) { - reg := &Consul{} + reg := &Consul{ + clientFactory: factory, + } config := &consul.Config{ ServerUrl: "http://127.0.0.1:8500", DataCenter: "test", } - client, err := reg.NewClient(config) + client, err := reg.clientFactory.NewClient(config) assert.NoError(t, err) assert.NotNil(t, client) @@ -46,7 +43,7 @@ func TestNewClient(t *testing.T) { DataCenter: "test", } - client, err = reg.NewClient(config) + client, err = reg.clientFactory.NewClient(config) assert.Error(t, err) assert.Nil(t, client) @@ -61,8 +58,23 @@ func (m *MockConsulCatalog) Services(q *api.QueryOptions) (map[string][]string, return nil, nil, nil } +type MockClientFactory struct { + mock.Mock +} + +func (f *MockClientFactory) NewClient(config *consul.Config) (*Client, error) { + mockConsulCatalog := new(MockConsulCatalog) + return &Client{ + consulCatalog: mockConsulCatalog, + DataCenter: "dc1", + NameSpace: "ns1", + Token: "token", + }, nil +} + func TestStart(t *testing.T) { mockConsulCatalog := new(MockConsulCatalog) + cf := new(MockClientFactory) client := &Client{ consulCatalog: mockConsulCatalog, DataCenter: "dc1", @@ -74,14 +86,14 @@ func TestStart(t *testing.T) { logger: log.NewLogger(&log.RegistryLoggerOptions{ Name: "test", }), - client: client, - done: make(chan struct{}), + done: make(chan struct{}), + clientFactory: cf, } config := &consul.Config{} mockConsulCatalog.On("Services", mock.Anything).Return(map[string][]string{"service1": {"dc1"}}, &api.QueryMeta{}, nil) - + reg.client = client err := reg.Start(config) assert.NoError(t, err) @@ -98,7 +110,8 @@ func TestStart(t *testing.T) { logger: log.NewLogger(&log.RegistryLoggerOptions{ Name: "test", }), - done: make(chan struct{}), + done: make(chan struct{}), + clientFactory: factory, } err = reg.Start(config) @@ -125,12 +138,13 @@ func TestRefresh(t *testing.T) { softDeletedServices: map[consulService]bool{}, done: make(chan struct{}), watchingServices: map[consulService]bool{}, + clientFactory: factory, } config := &consul.Config{ ServerUrl: "http://127.0.0.1:8500", } - client, _ := reg.NewClient(config) + client, _ := reg.clientFactory.NewClient(config) reg.client = client services := map[string][]string{ "service1": {"dc1", "dc2"}, @@ -164,50 +178,6 @@ func TestRefresh(t *testing.T) { } -func TestGenerateServiceEntry(t *testing.T) { - host := "test.default-group.public.earth.nacos" - reg := &Consul{} - - type test struct { - name string - services []model.SubscribeService - port *istioapi.ServicePort - endpoint *istioapi.WorkloadEntry - } - tests := []test{} - for input, proto := range registry.ProtocolMap { - s := string(proto) - tests = append(tests, test{ - name: input, - services: []model.SubscribeService{ - {Port: 80, Ip: "1.1.1.1", Metadata: map[string]string{ - "protocol": input, - }}, - }, - port: &istioapi.ServicePort{ - Name: s, - Protocol: s, - Number: 80, - }, - endpoint: &istioapi.WorkloadEntry{ - Address: "1.1.1.1", - Ports: map[string]uint32{s: 80}, - Labels: map[string]string{ - "protocol": input, - }, - }, - }) - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - se := reg.generateServiceEntry(host, tt.services) - require.True(t, proto.Equal(se.ServiceEntry.Ports[0], tt.port)) - require.True(t, proto.Equal(se.ServiceEntry.Endpoints[0], tt.endpoint)) - }) - } -} - func TestFetchAllServices(t *testing.T) { mockConsulCatalog := new(MockConsulCatalog) client := &Client{