Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support ttl check and update ttl #17

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions consul_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package consul

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/registry"
"github.com/hashicorp/consul/api"
)
Expand All @@ -30,8 +33,9 @@ type options struct {
}

type consulRegistry struct {
consulClient *api.Client
opts options
consulClient *api.Client
opts options
cancelUpdateTTL context.CancelFunc
}

const kvJoinChar = ":"
Expand Down Expand Up @@ -135,12 +139,23 @@ func (c *consulRegistry) Register(info *registry.Info) error {
Check: c.opts.check,
}

if c.opts.check != nil {
if c.opts.check != nil && c.opts.check.TTL == "" {
c.opts.check.TCP = fmt.Sprintf("%s:%d", host, port)
svcInfo.Check = c.opts.check
}

return c.consulClient.Agent().ServiceRegister(svcInfo)
if err := c.consulClient.Agent().ServiceRegister(svcInfo); err != nil {
return err
}

if c.opts.check.TTL != "" {
if ttl, err := time.ParseDuration(c.opts.check.TTL); err != nil {
return err
} else {
return c.startTTLHeartbeat(ttl)
}
}
return nil
}

// Deregister deregister a service from consul.
Expand All @@ -149,9 +164,41 @@ func (c *consulRegistry) Deregister(info *registry.Info) error {
if err != nil {
return err
}

if c.cancelUpdateTTL != nil {
defer c.cancelUpdateTTL()
}
return c.consulClient.Agent().ServiceDeregister(svcID)
}

// startTTLHeartbeat start a goroutine to periodically update TTL.
func (c *consulRegistry) startTTLHeartbeat(ttl time.Duration) error {
if ttl <= 1*time.Second {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个参数校验可以放在更前边,放在注册之前去校验 Agent().ServiceRegister(svcInfo);

return errors.New("consul check ttl must be greater than one second")
}

ctx, cancel := context.WithCancel(context.Background())
c.cancelUpdateTTL = cancel
go func() {
if err := c.consulClient.Agent().UpdateTTL(c.opts.check.CheckID, "online", api.HealthPassing); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

另外还应该评估下 Deregister 时 是否有必要主动停止这里的 更新,如果 调用 Deregister 但是距离真正服务停止还有一段时间,是否会有时间空隙,然后客户端会获取到这个准备下线的实例,去请求却又请求不到 造成失败。这种可以评估一下。

klog.Errorf("update ttl to consul failed, err=%v", err)
}
ticker := time.NewTicker(ttl - 1*time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.consulClient.Agent().UpdateTTL(c.opts.check.CheckID, "online", api.HealthPassing); err != nil {
klog.Errorf("update ttl to consul failed, err=%v", err)
}
case <-ctx.Done():
return
}
}
}()
return nil
}

func validateRegistryInfo(info *registry.Info) error {
if info.ServiceName == "" {
return errors.New("missing service name in consul register")
Expand Down
82 changes: 76 additions & 6 deletions consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"log"
"net"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -38,10 +39,11 @@ const (
)

var (
consulClient *consulapi.Client
cRegistry registry.Registry
cResolver discovery.Resolver
localIpAddr string
consulClient *consulapi.Client
cRegistry registry.Registry
cRegistryWithTTL registry.Registry
cResolver discovery.Resolver
localIpAddr string
)

func init() {
Expand All @@ -60,6 +62,19 @@ func init() {
}
cRegistry = r

r, err = NewConsulRegister(consulAddr, WithCheck(
&consulapi.AgentServiceCheck{
CheckID: "TEST-MY-CHECK-ID1",
TTL: "5s",
Timeout: "5s",
DeregisterCriticalServiceAfter: "1m",
},
))
if err != nil {
return
}
cRegistryWithTTL = r

resolver, err := NewConsulResolver(consulAddr)
if err != nil {
return
Expand Down Expand Up @@ -133,7 +148,6 @@ func TestRegister(t *testing.T) {
"k2": "vv2",
"k3": "vv3",
}
tagList = []string{"k1:vv1", "k2:vv2", "k3:vv3"}
)

// listen on the port, and wait for the health check to connect
Expand Down Expand Up @@ -165,7 +179,63 @@ func TestRegister(t *testing.T) {
assert.Equal(t, testSvcName, gotSvc.Service)
assert.Equal(t, testSvcAddr.String(), fmt.Sprintf("%s:%d", gotSvc.Address, gotSvc.Port))
assert.Equal(t, testSvcWeight, gotSvc.Weights.Passing)
assert.Equal(t, tagList, gotSvc.Tags)
assert.Equal(t, len(tagMap), len(gotSvc.Tags))
for _, tag := range gotSvc.Tags {
kv := strings.Split(tag, ":")
k, v := kv[0], kv[1]
assert.Equal(t, tagMap[k], v)
}
}
}

// TestRegisterWithTTLCheck tests the Register function with ttl check.
func TestRegisterWithTTLCheck(t *testing.T) {
var (
testSvcName = strconv.Itoa(int(time.Now().Unix())) + ".svc.local"
testSvcPort = 8085
testSvcWeight = 777
tagMap = map[string]string{
"k1": "vv1",
"k2": "vv2",
"k3": "vv3",
}
)

// listen on the port, and wait for the health check to connect
addr := fmt.Sprintf("%s:%d", localIpAddr, testSvcPort)
lis, err := net.Listen("tcp", addr)
if err != nil {
t.Errorf("listen tcp %s failed!", addr)
t.Fail()
}
defer lis.Close()

testSvcAddr, _ := net.ResolveTCPAddr("tcp", addr)
info := &registry.Info{
ServiceName: testSvcName,
Weight: testSvcWeight,
Addr: testSvcAddr,
Tags: tagMap,
}
err = cRegistryWithTTL.Register(info)
assert.Nil(t, err)
// wait for health check passing
time.Sleep(time.Second * 6)

list, _, err := consulClient.Health().Service(testSvcName, "", true, nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里去获取服务实例列表,建议多等待几个 TTL 的周期,或者连续去校验几个 TTL 周期,证明更新是有效的。

assert.Nil(t, err)
if assert.Equal(t, 1, len(list)) {
ss := list[0]
gotSvc := ss.Service
assert.Equal(t, testSvcName, gotSvc.Service)
assert.Equal(t, testSvcAddr.String(), fmt.Sprintf("%s:%d", gotSvc.Address, gotSvc.Port))
assert.Equal(t, testSvcWeight, gotSvc.Weights.Passing)
assert.Equal(t, len(tagMap), len(gotSvc.Tags))
for _, tag := range gotSvc.Tags {
kv := strings.Split(tag, ":")
k, v := kv[0], kv[1]
assert.Equal(t, tagMap[k], v)
}
}
}

Expand Down
Loading