Skip to content

Commit

Permalink
refactor codes
Browse files Browse the repository at this point in the history
  • Loading branch information
lyt122 committed Aug 6, 2024
1 parent 2b2fc28 commit f0d005c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 129 deletions.
98 changes: 26 additions & 72 deletions controller/registries/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"time"

consulapi "github.com/hashicorp/consul/api"
"github.com/nacos-group/nacos-sdk-go/model"
istioapi "istio.io/api/networking/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"mosn.io/htnn/controller/pkg/registry"
Expand All @@ -42,14 +40,15 @@ func init() {
name: om.Name,
softDeletedServices: map[consulService]bool{},
done: make(chan struct{}),
clientFactory: factory,

Check warning on line 43 in controller/registries/consul/config.go

View check run for this annotation

Codecov / codecov/patch

controller/registries/consul/config.go#L43

Added line #L43 was not covered by tests
}
return reg, nil
})
}

var (
SleepTime = 120 * time.Second
RegistryType = "consul"
//RegistryType = "consul"
factory ClientFactory = &DefaultClientFactory{}
)

type consulCatalog interface {
Expand All @@ -66,10 +65,11 @@ func (c *ConsulAPI) Services(q *consulapi.QueryOptions) (map[string][]string, *c

type Consul struct {
consul.RegistryType
logger log.RegistryLogger
store registry.ServiceEntryStore
name string
client *Client
logger log.RegistryLogger
store registry.ServiceEntryStore
name string
client *Client
clientFactory ClientFactory

lock sync.RWMutex
watchingServices map[consulService]bool
Expand All @@ -88,12 +88,13 @@ type Client struct {
Token string
}

type consulService struct {
DataCenter string
ServiceName string
type ClientFactory interface {
NewClient(config *consul.Config) (*Client, error)
}

func (reg *Consul) NewClient(config *consul.Config) (*Client, error) {
type DefaultClientFactory struct{}

func (f *DefaultClientFactory) NewClient(config *consul.Config) (*Client, error) {
uri, err := url.Parse(config.ServerUrl)
if err != nil {
return nil, fmt.Errorf("invalid server url: %s", config.ServerUrl)
Expand All @@ -118,31 +119,26 @@ func (reg *Consul) NewClient(config *consul.Config) (*Client, error) {
}, nil
}

type consulService struct {
DataCenter string
ServiceName string
}

func (reg *Consul) Start(c registrytype.RegistryConfig) error {
config := c.(*consul.Config)

if reg.client == nil {
client, err := reg.clientFactory.NewClient(config)
if err != nil {
return err
}

client, err := reg.NewClient(config)
if err != nil {
return err
}
reg.client = client

reg.client = client
}
services, err := reg.fetchAllServices(reg.client)

if err != nil {
return err
}
//for key := range services {
// err = reg.subscribe(key.ServiceName)
// if err != nil {
// reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, key)
//
// delete(services, key)
// }
//}

reg.watchingServices = services

Expand All @@ -163,13 +159,14 @@ func (reg *Consul) Start(c registrytype.RegistryConfig) error {
select {
case <-reg.done:
reg.logger.Infof("stop refreshing services")
//wait to retry
time.Sleep(SleepTime)
return
default:

Check warning on line 163 in controller/registries/consul/config.go

View check run for this annotation

Codecov / codecov/patch

controller/registries/consul/config.go#L163

Added line #L163 was not covered by tests
}
services, meta, err := reg.client.consulCatalog.Services(q)
if err != nil {
reg.logger.Errorf("failed to get services, err: %v", err)
time.Sleep(dur)
continue

Check warning on line 169 in controller/registries/consul/config.go

View check run for this annotation

Codecov / codecov/patch

controller/registries/consul/config.go#L165-L169

Added lines #L165 - L169 were not covered by tests
}
reg.refresh(services)

Check warning on line 171 in controller/registries/consul/config.go

View check run for this annotation

Codecov / codecov/patch

controller/registries/consul/config.go#L171

Added line #L171 was not covered by tests

Expand Down Expand Up @@ -263,46 +260,3 @@ func (reg *Consul) refresh(services map[string][]string) {
}

}

func (reg *Consul) generateServiceEntry(host string, services []model.SubscribeService) *registry.ServiceEntryWrapper {
portList := make([]*istioapi.ServicePort, 0, 1)
endpoints := make([]*istioapi.WorkloadEntry, 0, len(services))

for _, service := range services {
protocol := registry.HTTP
if service.Metadata == nil {
service.Metadata = make(map[string]string)
}

if service.Metadata["protocol"] != "" {
protocol = registry.ParseProtocol(service.Metadata["protocol"])
}

port := &istioapi.ServicePort{
Name: string(protocol),
Number: uint32(service.Port),
Protocol: string(protocol),
}
if len(portList) == 0 {
portList = append(portList, port)
}

endpoint := istioapi.WorkloadEntry{
Address: service.Ip,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: service.Metadata,
}
endpoints = append(endpoints, &endpoint)
}

return &registry.ServiceEntryWrapper{
ServiceEntry: istioapi.ServiceEntry{
Hosts: []string{host},
Ports: portList,
Location: istioapi.ServiceEntry_MESH_INTERNAL,
Resolution: istioapi.ServiceEntry_STATIC,
Endpoints: endpoints,
},
Source: RegistryType,
}
}
84 changes: 27 additions & 57 deletions controller/registries/consul/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,22 @@ import (
"testing"

"github.com/hashicorp/consul/api"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
istioapi "istio.io/api/networking/v1alpha3"

"mosn.io/htnn/controller/pkg/registry"
"mosn.io/htnn/controller/pkg/registry/log"
"mosn.io/htnn/types/registries/consul"
)

func TestNewClient(t *testing.T) {
reg := &Consul{}
reg := &Consul{
clientFactory: factory,
}
config := &consul.Config{
ServerUrl: "http://127.0.0.1:8500",
DataCenter: "test",
}
client, err := reg.NewClient(config)
client, err := reg.clientFactory.NewClient(config)

assert.NoError(t, err)
assert.NotNil(t, client)
Expand All @@ -46,7 +43,7 @@ func TestNewClient(t *testing.T) {
DataCenter: "test",
}

client, err = reg.NewClient(config)
client, err = reg.clientFactory.NewClient(config)

assert.Error(t, err)
assert.Nil(t, client)
Expand All @@ -61,8 +58,23 @@ func (m *MockConsulCatalog) Services(q *api.QueryOptions) (map[string][]string,
return nil, nil, nil
}

type MockClientFactory struct {
mock.Mock
}

func (f *MockClientFactory) NewClient(config *consul.Config) (*Client, error) {
mockConsulCatalog := new(MockConsulCatalog)
return &Client{
consulCatalog: mockConsulCatalog,
DataCenter: "dc1",
NameSpace: "ns1",
Token: "token",
}, nil
}

func TestStart(t *testing.T) {
mockConsulCatalog := new(MockConsulCatalog)
cf := new(MockClientFactory)
client := &Client{
consulCatalog: mockConsulCatalog,
DataCenter: "dc1",
Expand All @@ -74,14 +86,14 @@ func TestStart(t *testing.T) {
logger: log.NewLogger(&log.RegistryLoggerOptions{
Name: "test",
}),
client: client,
done: make(chan struct{}),
done: make(chan struct{}),
clientFactory: cf,
}

config := &consul.Config{}

mockConsulCatalog.On("Services", mock.Anything).Return(map[string][]string{"service1": {"dc1"}}, &api.QueryMeta{}, nil)

reg.client = client
err := reg.Start(config)
assert.NoError(t, err)

Expand All @@ -98,7 +110,8 @@ func TestStart(t *testing.T) {
logger: log.NewLogger(&log.RegistryLoggerOptions{
Name: "test",
}),
done: make(chan struct{}),
done: make(chan struct{}),
clientFactory: factory,
}

err = reg.Start(config)
Expand All @@ -125,12 +138,13 @@ func TestRefresh(t *testing.T) {
softDeletedServices: map[consulService]bool{},
done: make(chan struct{}),
watchingServices: map[consulService]bool{},
clientFactory: factory,
}

config := &consul.Config{
ServerUrl: "http://127.0.0.1:8500",
}
client, _ := reg.NewClient(config)
client, _ := reg.clientFactory.NewClient(config)
reg.client = client
services := map[string][]string{
"service1": {"dc1", "dc2"},
Expand Down Expand Up @@ -164,50 +178,6 @@ func TestRefresh(t *testing.T) {

}

func TestGenerateServiceEntry(t *testing.T) {
host := "test.default-group.public.earth.nacos"
reg := &Consul{}

type test struct {
name string
services []model.SubscribeService
port *istioapi.ServicePort
endpoint *istioapi.WorkloadEntry
}
tests := []test{}
for input, proto := range registry.ProtocolMap {
s := string(proto)
tests = append(tests, test{
name: input,
services: []model.SubscribeService{
{Port: 80, Ip: "1.1.1.1", Metadata: map[string]string{
"protocol": input,
}},
},
port: &istioapi.ServicePort{
Name: s,
Protocol: s,
Number: 80,
},
endpoint: &istioapi.WorkloadEntry{
Address: "1.1.1.1",
Ports: map[string]uint32{s: 80},
Labels: map[string]string{
"protocol": input,
},
},
})
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
se := reg.generateServiceEntry(host, tt.services)
require.True(t, proto.Equal(se.ServiceEntry.Ports[0], tt.port))
require.True(t, proto.Equal(se.ServiceEntry.Endpoints[0], tt.endpoint))
})
}
}

func TestFetchAllServices(t *testing.T) {
mockConsulCatalog := new(MockConsulCatalog)
client := &Client{
Expand Down

0 comments on commit f0d005c

Please sign in to comment.