diff --git a/controller/registries/consul/config.go b/controller/registries/consul/config.go index 312deb9f..417a656b 100644 --- a/controller/registries/consul/config.go +++ b/controller/registries/consul/config.go @@ -45,10 +45,6 @@ func init() { }) } -const ( - defaultToken = "" -) - type Consul struct { consul.RegistryType logger log.RegistryLogger @@ -70,11 +66,7 @@ type Client struct { DataCenter string NameSpace string -} - -type consulService struct { - DataCenter string - ServiceName string + Token string } func (reg *Consul) NewClient(config *consul.Config) (*Client, error) { @@ -85,7 +77,7 @@ func (reg *Consul) NewClient(config *consul.Config) (*Client, error) { clientConfig := consulapi.DefaultConfig() clientConfig.Address = uri.Host clientConfig.Scheme = uri.Scheme - clientConfig.Token = defaultToken + clientConfig.Token = config.Token clientConfig.Datacenter = config.DataCenter client, err := consulapi.NewClient(clientConfig) @@ -97,9 +89,16 @@ func (reg *Consul) NewClient(config *consul.Config) (*Client, error) { consulClient: client, consulCatalog: client.Catalog(), DataCenter: config.DataCenter, + NameSpace: config.Namespace, + Token: config.Token, }, nil } +type consulService struct { + Tag string + ServiceName string +} + func (reg *Consul) Start(c registrytype.RegistryConfig) error { config := c.(*consul.Config) @@ -108,11 +107,13 @@ func (reg *Consul) Start(c registrytype.RegistryConfig) error { return err } - services, err := reg.fetchAllServices(client) + reg.client = client + + services, err := reg.fetchAllServices(reg.client) + if err != nil { - return fmt.Errorf("fetch all services error: %v", err) + return err } - reg.client = client reg.watchingServices = services @@ -122,22 +123,29 @@ func (reg *Consul) Start(c registrytype.RegistryConfig) error { } go func() { reg.logger.Infof("start refreshing services") - ticker := time.NewTicker(dur) - //q := consulapi.QueryOptions{ - // WaitTime: dur, - //} - defer ticker.Stop() + q := &consulapi.QueryOptions{ + WaitTime: dur, + Namespace: config.Namespace, + Datacenter: config.DataCenter, + Token: config.Token, + } for { + select { - case <-ticker.C: - err := reg.refresh() - if err != nil { - reg.logger.Errorf("failed to refresh services, err: %v", err) - } case <-reg.done: reg.logger.Infof("stop refreshing services") return + default: + } + services, meta, err := reg.client.consulCatalog.Services(q) + if err != nil { + reg.logger.Errorf("failed to get services, err: %v", err) + time.Sleep(dur) + continue } + reg.refresh(services) + + q.WaitIndex = meta.LastIndex } }() @@ -160,13 +168,28 @@ func (reg *Consul) Reload(c registrytype.RegistryConfig) error { return nil } -func (reg *Consul) refresh() error { - return nil -} - func (reg *Consul) fetchAllServices(client *Client) (map[consulService]bool, error) { - fmt.Println(client) - return nil, nil + q := &consulapi.QueryOptions{} + q.Datacenter = client.DataCenter + q.Namespace = client.NameSpace + q.Token = client.Token + services, _, err := client.consulCatalog.Services(q) + + if err != nil { + reg.logger.Errorf("failed to get service, err: %v", err) + return nil, err + } + serviceMap := make(map[consulService]bool) + for serviceName, tags := range services { + for _, tag := range tags { + service := consulService{ + Tag: tag, + ServiceName: serviceName, + } + serviceMap[service] = true + } + } + return serviceMap, nil } func (reg *Consul) subscribe(serviceName string) error { @@ -178,3 +201,37 @@ func (reg *Consul) unsubscribe(serviceName string) error { fmt.Println(serviceName) return nil } + +func (reg *Consul) refresh(services map[string][]string) { + serviceMap := make(map[consulService]bool) + for serviceName, tags := range services { + for _, tag := range tags { + service := consulService{ + Tag: tag, + ServiceName: serviceName, + } + serviceMap[service] = true + if _, ok := reg.watchingServices[service]; !ok { + err := reg.subscribe(serviceName) + if err != nil { + reg.logger.Errorf("failed to subscribe service, err: %v, service: %v", err, serviceName) + delete(serviceMap, service) + } + } + } + } + + prevFetchServices := reg.watchingServices + reg.watchingServices = serviceMap + + for key := range prevFetchServices { + if _, ok := serviceMap[key]; !ok { + err := reg.unsubscribe(key.ServiceName) + if err != nil { + reg.logger.Errorf("failed to unsubscribe service, err: %v, service: %v", err, key) + } + reg.softDeletedServices[key] = true + } + } + +} diff --git a/controller/registries/consul/config_test.go b/controller/registries/consul/config_test.go index c3f8caf6..bbbc1753 100644 --- a/controller/registries/consul/config_test.go +++ b/controller/registries/consul/config_test.go @@ -15,8 +15,12 @@ package consul import ( + "errors" + "reflect" "testing" + "github.com/agiledragon/gomonkey/v2" + "github.com/hashicorp/consul/api" "github.com/stretchr/testify/assert" "mosn.io/htnn/controller/pkg/registry/log" @@ -50,28 +54,42 @@ func TestStart(t *testing.T) { logger: log.NewLogger(&log.RegistryLoggerOptions{ Name: "test", }), - softDeletedServices: map[consulService]bool{}, - done: make(chan struct{}), - watchingServices: map[consulService]bool{}, - } - config := &consul.Config{ - ServerUrl: "http://127.0.0.1:8500", + done: make(chan struct{}), } + patches := gomonkey.ApplyPrivateMethod(reflect.TypeOf(reg), "fetchAllServices", func(_ *Consul, client *Client) (map[consulService]bool, error) { + return map[consulService]bool{ + {ServiceName: "service1", Tag: "tag1"}: true, + {ServiceName: "service2", Tag: "tag2"}: true, + }, nil + }) + config := &consul.Config{} err := reg.Start(config) - assert.NoError(t, err) - + assert.Nil(t, err) err = reg.subscribe("123") assert.Nil(t, err) err = reg.unsubscribe("123") assert.Nil(t, err) - err = reg.refresh() - assert.Nil(t, err) - err = reg.Stop() assert.Nil(t, err) + + patches.Reset() + + config = &consul.Config{} + + reg = &Consul{ + logger: log.NewLogger(&log.RegistryLoggerOptions{ + Name: "test", + }), + done: make(chan struct{}), + } + + err = reg.Start(config) + assert.Error(t, err) + + close(reg.done) } func TestReload(t *testing.T) { @@ -83,3 +101,105 @@ func TestReload(t *testing.T) { err := reg.Reload(config) assert.NoError(t, err) } + +func TestRefresh(t *testing.T) { + reg := &Consul{ + logger: log.NewLogger(&log.RegistryLoggerOptions{ + Name: "test", + }), + softDeletedServices: map[consulService]bool{}, + done: make(chan struct{}), + watchingServices: map[consulService]bool{}, + } + + config := &consul.Config{ + ServerUrl: "http://127.0.0.1:8500", + } + client, _ := reg.NewClient(config) + reg.client = client + services := map[string][]string{ + "service1": {"dc1", "dc2"}, + "service2": {"dc1"}, + } + + reg.refresh(services) + + assert.Len(t, reg.watchingServices, 3) + assert.Contains(t, reg.watchingServices, consulService{ServiceName: "service1", Tag: "dc1"}) + assert.Contains(t, reg.watchingServices, consulService{ServiceName: "service1", Tag: "dc2"}) + assert.Contains(t, reg.watchingServices, consulService{ServiceName: "service2", Tag: "dc1"}) + assert.Empty(t, reg.softDeletedServices) + + reg = &Consul{ + logger: log.NewLogger(&log.RegistryLoggerOptions{ + Name: "test", + }), + softDeletedServices: map[consulService]bool{}, + watchingServices: map[consulService]bool{ + {ServiceName: "service1", Tag: "dc1"}: true, + }, + } + + services = map[string][]string{} + + reg.refresh(services) + + assert.Len(t, reg.watchingServices, 0) + assert.Len(t, reg.softDeletedServices, 1) + +} + +func TestFetchAllServices(t *testing.T) { + t.Run("Test fetchAllServices method", func(t *testing.T) { + reg := &Consul{ + logger: log.NewLogger(&log.RegistryLoggerOptions{ + Name: "test", + }), + } + client := &Client{ + consulCatalog: &api.Catalog{}, + DataCenter: "dc1", + NameSpace: "ns1", + Token: "token", + } + + patches := gomonkey.ApplyMethod(reflect.TypeOf(client.consulCatalog), "Services", func(_ *api.Catalog, q *api.QueryOptions) (map[string][]string, *api.QueryMeta, error) { + return map[string][]string{ + "service1": {"tag1", "tag2"}, + "service2": {"tag3"}, + }, nil, nil + }) + defer patches.Reset() + + services, err := reg.fetchAllServices(client) + assert.NoError(t, err) + assert.NotNil(t, services) + assert.True(t, services[consulService{ServiceName: "service1", Tag: "tag1"}]) + assert.True(t, services[consulService{ServiceName: "service1", Tag: "tag2"}]) + assert.True(t, services[consulService{ServiceName: "service2", Tag: "tag3"}]) + }) + + t.Run("Test fetchAllServices method with error", func(t *testing.T) { + reg := &Consul{ + logger: log.NewLogger(&log.RegistryLoggerOptions{ + Name: "test", + }), + } + client := &Client{ + consulCatalog: &api.Catalog{}, + DataCenter: "dc1", + NameSpace: "ns1", + Token: "token", + } + + patches := gomonkey.ApplyMethod(reflect.TypeOf(client.consulCatalog), "Services", func(_ *api.Catalog, q *api.QueryOptions) (map[string][]string, *api.QueryMeta, error) { + return nil, nil, errors.New("mock error") + }) + defer patches.Reset() + + services, err := reg.fetchAllServices(client) + assert.Error(t, err) + assert.Equal(t, "mock error", err.Error()) + assert.Nil(t, services) + }) +} diff --git a/types/registries/consul/config.pb.go b/types/registries/consul/config.pb.go index ddbc964a..6fdf52f2 100644 --- a/types/registries/consul/config.pb.go +++ b/types/registries/consul/config.pb.go @@ -44,7 +44,9 @@ type Config struct { ServerUrl string `protobuf:"bytes,1,opt,name=server_url,json=serverUrl,proto3" json:"server_url,omitempty"` DataCenter string `protobuf:"bytes,2,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"` - ServiceRefreshInterval *durationpb.Duration `protobuf:"bytes,3,opt,name=service_refresh_interval,json=serviceRefreshInterval,proto3" json:"service_refresh_interval,omitempty"` + Namespace string `protobuf:"bytes,3,opt,name=namespace,proto3" json:"namespace,omitempty"` + Token string `protobuf:"bytes,4,opt,name=token,proto3" json:"token,omitempty"` + ServiceRefreshInterval *durationpb.Duration `protobuf:"bytes,5,opt,name=service_refresh_interval,json=serviceRefreshInterval,proto3" json:"service_refresh_interval,omitempty"` } func (x *Config) Reset() { @@ -93,6 +95,20 @@ func (x *Config) GetDataCenter() string { return "" } +func (x *Config) GetNamespace() string { + if x != nil { + return x.Namespace + } + return "" +} + +func (x *Config) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + func (x *Config) GetServiceRefreshInterval() *durationpb.Duration { if x != nil { return x.ServiceRefreshInterval @@ -110,21 +126,25 @@ var file_types_registries_consul_config_proto_rawDesc = []byte{ 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x17, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, - 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb3, 0x01, 0x0a, 0x06, 0x43, 0x6f, 0x6e, + 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe7, 0x01, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x27, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x08, 0xfa, 0x42, 0x05, 0x72, 0x03, 0x88, 0x01, 0x01, 0x52, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x55, 0x72, 0x6c, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x63, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x43, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x12, 0x5f, 0x0a, - 0x18, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x5f, 0x72, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, - 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x0a, 0xfa, 0x42, 0x07, 0xaa, - 0x01, 0x04, 0x32, 0x02, 0x08, 0x01, 0x52, 0x16, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, - 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x42, 0x26, - 0x5a, 0x24, 0x6d, 0x6f, 0x73, 0x6e, 0x2e, 0x69, 0x6f, 0x2f, 0x68, 0x74, 0x6e, 0x6e, 0x2f, 0x74, - 0x79, 0x70, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x69, 0x65, 0x73, 0x2f, - 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x09, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x43, 0x65, 0x6e, 0x74, 0x65, 0x72, 0x12, 0x1c, 0x0a, + 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, + 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, + 0x6e, 0x12, 0x5f, 0x0a, 0x18, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x5f, 0x72, 0x65, 0x66, + 0x72, 0x65, 0x73, 0x68, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x0a, + 0xfa, 0x42, 0x07, 0xaa, 0x01, 0x04, 0x32, 0x02, 0x08, 0x01, 0x52, 0x16, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, + 0x61, 0x6c, 0x42, 0x26, 0x5a, 0x24, 0x6d, 0x6f, 0x73, 0x6e, 0x2e, 0x69, 0x6f, 0x2f, 0x68, 0x74, + 0x6e, 0x6e, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, + 0x69, 0x65, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/types/registries/consul/config.pb.validate.go b/types/registries/consul/config.pb.validate.go index 27e88e0b..7129353b 100644 --- a/types/registries/consul/config.pb.validate.go +++ b/types/registries/consul/config.pb.validate.go @@ -79,6 +79,10 @@ func (m *Config) validate(all bool) error { // no validation rules for DataCenter + // no validation rules for Namespace + + // no validation rules for Token + if d := m.GetServiceRefreshInterval(); d != nil { dur, err := d.AsDuration(), d.CheckValid() if err != nil { diff --git a/types/registries/consul/config.proto b/types/registries/consul/config.proto index 154f93d4..6380d017 100644 --- a/types/registries/consul/config.proto +++ b/types/registries/consul/config.proto @@ -24,6 +24,8 @@ option go_package = "mosn.io/htnn/types/registries/consul"; message Config { string server_url = 1 [(validate.rules).string = {uri: true}]; string data_center = 2; - google.protobuf.Duration service_refresh_interval = 3 + string namespace = 3; + string token = 4; + google.protobuf.Duration service_refresh_interval = 5 [(validate.rules).duration = {gte {seconds: 1}}]; }