-
Notifications
You must be signed in to change notification settings - Fork 0
/
consul.go
177 lines (153 loc) · 3.83 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright © 2015 Jason Smith <[email protected]>.
//
// Use of this source code is governed by the LGPL-3
// license that can be found in the LICENSE file.
package consul
import (
"encoding/json"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/duckbunny/herald"
"github.com/duckbunny/service"
"github.com/hashicorp/consul/api"
)
var (
// TTL time to life for service in consul
TTL int = 15
// Where the ServiceKVPath resides
ServicesKVpath string = "services"
// Title for specifying herald in flags
Title string = "consul"
// Config falls back to client default config
ConsulConfig *api.Config = api.DefaultConfig()
)
func init() {
ttl := os.Getenv("CONSUL_TTL")
if ttl != "" {
newttl, err := strconv.Atoi(ttl)
if err != nil {
log.Fatal(err)
}
TTL = newttl
}
flag.IntVar(&TTL, "consul-ttl", TTL, "TTL for consul microservice heartbeats.")
}
// Consul structure
type Consul struct {
// Agent to register service
Agent *api.Agent
// KV to save service definition
KV *api.KV
heartBeatKill chan bool
}
// New Consul
func New() *Consul {
c := new(Consul)
c.heartBeatKill = make(chan bool)
return c
}
// Start Register the service in the consul pool of services
func (c *Consul) Start(s *service.Service) error {
p, err := strconv.ParseInt(s.Port, 10, 0)
if err != nil {
return err
}
AgentService := api.AgentServiceRegistration{
ID: FormattedID(s),
Name: FormattedName(s),
Port: int(p),
Check: &api.AgentServiceCheck{
TTL: fmt.Sprintf("%vs", TTL),
},
}
// Register the service
err = c.Agent.ServiceRegister(&AgentService)
if err != nil {
return err
}
// Initial run for TTL
c.Agent.PassTTL(fmt.Sprintf("service:%v", FormattedID(s)), "TTL heartbeat")
// Begin TTL refresh
go c.Heartbeat(s)
return nil
}
// Kill the Hearteat and remove the service
func (c *Consul) Stop(s *service.Service) error {
c.heartBeatKill <- true
return c.Agent.ServiceDeregister(FormattedID(s))
}
// Init Consul herald with Default Settings
func (c *Consul) Init() error {
client, err := api.NewClient(ConsulConfig)
if err != nil {
return err
}
if c.Agent == nil {
c.Agent = client.Agent()
}
if c.KV == nil {
c.KV = client.KV()
}
return nil
}
// Send service definition to consul
func (c *Consul) Declare(s *service.Service) error {
js, err := json.Marshal(s)
if err != nil {
return err
}
key := FormattedKey(s)
pair := api.KVPair{
Key: key,
Flags: 0,
Value: js,
}
_, err = c.KV.Put(&pair, nil)
return err
}
// Retrieve the consul service definition. Requires Domain, Title and Version be set. Returns err if not found.
func (c *Consul) GetService(s *service.Service) error {
key := FormattedKey(s)
qo := api.QueryOptions{}
v, _, err := c.KV.Get(key, &qo)
if err != nil {
return err
}
return json.Unmarshal(v.Value, s)
}
// FormattedName returns correctly formatted name of the service
func FormattedName(s *service.Service) string {
name := fmt.Sprintf("%v-%v-%v", s.Domain, s.Title, s.Version)
return strings.Replace(name, ".", "-", -1)
}
// FormattedID returns correctly formatted id of the service
func FormattedID(s *service.Service) string {
return fmt.Sprintf("%v-%v-%v", FormattedName(s), s.Host, s.Port)
}
// FormattedKey returns correctly formatted key of the service
func FormattedKey(s *service.Service) string {
return fmt.Sprintf("%v/%v/%v/%v/definition", ServiceKVPath, s.Domain, s.Title, s.Version)
}
// Heartbeat begins heart beat of health check.
func (c *Consul) Heartbeat(s *service.Service) {
for _ = range time.Tick(time.Duration(TTL-1) * time.Second) {
select {
case <-c.heartBeatKill:
c.Stop(s)
return
default:
}
c.Agent.PassTTL(fmt.Sprintf("service:%v", FormattedID(s)), "TTL heartbeat")
}
}
// Register this herald with consul
func Register() {
c := New()
herald.AddPool(Title, c)
herald.AddDeclaration(Title, c)
}