-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
127 lines (117 loc) · 3.2 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package drift
import (
"strings"
"github.com/mayur-tolexo/aqua"
"github.com/mayur-tolexo/drift/lib"
)
//DS is the drift service
type ds struct {
aqua.RestService `prefix:"drift" root:"/" version:"1"`
consumerCount aqua.GET `url:"consumer/"`
stopAdmin aqua.GET `url:"stop/admin/"`
startAdmin aqua.POST `url:"start/admin/"`
addConsumer aqua.POST `url:"add/consumer/"`
publishReq aqua.POST `url:"pub/request/"`
killConsumer aqua.POST `url:"kill/consumer/"`
admin aqua.POST `url:"admin/"`
drift *Drift
}
//AddConsumer will add new consumer to the given topic
func (d *ds) AddConsumer(req aqua.Aide) (int, interface{}) {
var (
data interface{}
payload AddConstumer
err error
)
if payload, err = vAddConsumer(req); err == nil {
data, err = d.drift.AddConsumer(payload)
}
return lib.BuildResponse(data, err)
}
//PublishReq will publish request
func (d *ds) PublishReq(req aqua.Aide) (int, interface{}) {
var (
data interface{}
payload Publish
err error
)
if payload, err = vPublishReq(req); err == nil {
data, err = pPublishReq(d.drift, payload)
}
return lib.BuildResponse(data, err)
}
//ConsumerCount will return the consumer count of the channel of given topic
//pass topic and channel in query params.
//Is need to get count of total consumer of a topic then only pass topic
func (d *ds) ConsumerCount(qParam aqua.Aide) (int, interface{}) {
count := 0
qParam.LoadVars()
topic := qParam.QueryVar["topic"]
channel := qParam.QueryVar["channel"]
if channel == "" {
for key, val := range d.drift.consumers {
if strings.HasPrefix(key, topic) {
count += len(val)
}
}
} else {
count = len(d.drift.consumers[hash(topic, channel)])
}
return lib.BuildResponse(count, nil)
}
//KillConsumer will kill consumer of given topic
func (d *ds) KillConsumer(req aqua.Aide) (int, interface{}) {
var (
data interface{}
payload KillConsumer
err error
)
if payload, err = vKillConsumer(req); err == nil {
data, err = d.drift.killConsumer(payload)
}
return lib.BuildResponse(data, err)
}
//StartAdmin will start admin
func (d *ds) StartAdmin(req aqua.Aide) (int, interface{}) {
var (
data interface{}
err error
)
if err = d.drift.admin.vStartAdmin(req); err == nil {
if d.drift.admin.adminRunning {
err = lib.VError("Already Running at", d.drift.admin.httpAddrs)
} else {
go d.drift.admin.startAdmin()
data = "Admin started at " + d.drift.admin.httpAddrs
}
}
return lib.BuildResponse(data, err)
}
//StopAdmin will stop admin
func (d *ds) StopAdmin(req aqua.Aide) (int, interface{}) {
var data interface{}
if d.drift.admin.adminRunning {
d.drift.admin.exitAdmin <- 1
<-d.drift.admin.exitAdmin
data = "DONE"
} else {
data = "Not Running"
}
return lib.BuildResponse(data, nil)
}
//Admin will do the admin actions
func (d *ds) Admin(req aqua.Aide) (int, interface{}) {
var (
data interface{}
payload Admin
err error
)
if payload, err = vAdmin(req); err == nil {
if d.drift.admin.adminRunning {
data, err = d.drift.admin.doAction(payload, req.Request.Header.Get(d.drift.admin.aclHTTPHeader))
} else {
err = lib.VError("Admin not running")
}
}
return lib.BuildResponse(data, err)
}