-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
180 lines (154 loc) · 5.05 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package main
import (
"context"
_ "embed"
"flag"
"fmt"
"github.com/pb82/prometheus-toolbox/api"
"github.com/pb82/prometheus-toolbox/internal"
"github.com/pb82/prometheus-toolbox/pkg/precalculated"
"github.com/pb82/prometheus-toolbox/pkg/proxy"
"github.com/pb82/prometheus-toolbox/pkg/remotewrite"
"github.com/pb82/prometheus-toolbox/pkg/stream"
"github.com/pb82/prometheus-toolbox/version"
"log"
"net/url"
"os"
"os/signal"
"path"
"sync"
"syscall"
)
const (
DefaultConfigFile = "./config.yml"
DefaultBatchSize = 500
DefaultProxyListenPort = 3241
DefaultRemoteWriteEndpoint = "/api/v1/write"
)
var (
prometheusUrl *string
configFile *string
printVersion *bool
batchSize *int
proxyListen *bool
proxyListenPort *int
environment *bool
initialize *bool
oidcClientId *string
oidcClientSecret *string
oidcIssuerUrl *string
oidcAudience *string
oidcEnabled *bool
remoteWriteSuffix *string
rules *bool
)
var (
//go:embed environment.sh
environmentSetupScript string
//go:embed config.yml
exampleConfig string
//go:embed rules.yml
exampleRules string
)
func main() {
flag.Parse()
if *printVersion {
fmt.Printf("Prometheus toolbox v%v", version.Version)
fmt.Println()
os.Exit(0)
}
if *environment {
fmt.Println(environmentSetupScript)
os.Exit(0)
}
if *initialize {
fmt.Println(exampleConfig)
os.Exit(0)
}
if *rules {
fmt.Println(exampleRules)
os.Exit(0)
}
bytes, err := os.ReadFile(*configFile)
if err != nil {
fmt.Println(fmt.Sprintf("error reading config file: %v", err.Error()))
os.Exit(1)
}
config, err := api.FromYaml(bytes)
if err != nil {
fmt.Println(fmt.Sprintf("error parsing config file: %v", err.Error()))
os.Exit(1)
}
if *prometheusUrl == "" {
fmt.Println("missing prometheus base url, make sure to set --prometheus.url")
os.Exit(1)
}
parsedPrometheusUrl, err := url.Parse(*prometheusUrl)
if err != nil {
fmt.Println(fmt.Sprintf("error parsing prometheus base url: %v", err.Error()))
os.Exit(1)
}
parsedPrometheusUrl.Path = path.Join(parsedPrometheusUrl.Path, *remoteWriteSuffix)
requests, samples, err := precalculated.SchedulePrecalculatedRemoteWriteRequests(config, *batchSize)
if err != nil {
fmt.Println(fmt.Sprintf("error sending samples: %v", err.Error()))
os.Exit(1)
}
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE, syscall.SIGABRT)
defer stop()
remoteWriter, err := buildRemoteWriter(ctx, parsedPrometheusUrl)
if err != nil {
panic(err)
}
if len(requests) > 0 {
log.Printf("sending %v samples in %v requests (max batch size is %v)", samples, len(requests), *batchSize)
for i, request := range requests {
err = remoteWriter.SendWriteRequest(request)
if err != nil {
log.Fatalf("error sending batch: %v", err.Error())
}
log.Printf("successfully sent batch %v/%v", i+1, len(requests))
}
log.Printf("done sending precalculated series")
} else {
log.Println("no precalculated series")
}
wg := &sync.WaitGroup{}
err = stream.StartStreamWriters(ctx, config, remoteWriter, wg)
if err != nil {
log.Fatalf("error starting stream writer: %v", err.Error())
}
if *proxyListen {
err := proxy.StartListener(ctx, proxyListenPort)
if err != nil {
log.Fatalf("error starting proxy listener: %v", err.Error())
}
}
wg.Wait()
}
func buildRemoteWriter(ctx context.Context, prometheusUrl *url.URL) (*remotewrite.RemoteWriter, error) {
if *oidcEnabled {
oidcConfig := internal.NewOIDCConfig(*oidcClientId, *oidcClientSecret, *oidcIssuerUrl, *oidcAudience)
oidcConfig.Validate()
return remotewrite.NewRemoteWriterWithOIDCTransport(ctx, prometheusUrl, oidcConfig)
} else {
return remotewrite.NewRemoteWriter(prometheusUrl), nil
}
}
func init() {
prometheusUrl = flag.String("prometheus.url", "", "prometheus base url")
configFile = flag.String("config.file", DefaultConfigFile, "config file location")
batchSize = flag.Int("batch.size", DefaultBatchSize, "max number of samples per remote write request")
printVersion = flag.Bool("version", false, "print version and exit")
proxyListen = flag.Bool("proxy.listen", false, "receive remote write requests")
proxyListenPort = flag.Int("proxy.listen.port", DefaultProxyListenPort, "port to receive remote write requests")
environment = flag.Bool("environment", false, "print environment setup script and exit")
initialize = flag.Bool("init", false, "print sample config file and exit")
oidcClientId = flag.String("oidc.clientId", "", "oidc client id")
oidcClientSecret = flag.String("oidc.clientSecret", "", "oidc client secret")
oidcIssuerUrl = flag.String("oidc.issuer", "", "oidc token issuer url")
oidcAudience = flag.String("oidc.audience", "", "oidc audience")
oidcEnabled = flag.Bool("oidc.enabled", false, "enable oidc token authentication")
remoteWriteSuffix = flag.String("prometheus.url.suffix", DefaultRemoteWriteEndpoint, "allows alternate remote write endpoints")
rules = flag.Bool("rules", false, "print sample alerting rules file and exit")
}