-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.go
186 lines (175 loc) · 6.32 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package main
import (
"context"
_ "embed"
"flag"
"github.com/celerway/metamorphosis/bridge"
"github.com/celerway/metamorphosis/log"
"github.com/joho/godotenv"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"time"
)
//go:embed .version
var embeddedVersion string
func main() {
var ( // default settings:
logLevelStr string
mqttBroker string
mqttPort int = 8883
mqttTopic string
mqttTls bool = true
mqttClientId string
caRootCertFile string
mqttCaClientCertFile string
mqttCaClientKeyFile string
kafkaBroker string
kafkaPort int = 9092
kafkaTopic string
healthPort int = 8080
kafkaRetryInterval int = 3
kafkaInterval int = 5
kafkaBatchSize int = 1000
kafkaMaxBatchSize int = 8000
kafkaTestTopic string = ""
)
err := godotenv.Load()
log.Infof("Metamorphosis %s starting up.", embeddedVersion)
if err != nil {
log.Infof("Error loading .env file, assuming production: %s", err.Error())
}
flag.StringVar(&logLevelStr, "log-level",
LookupEnvOrString("LOG_LEVEL", logLevelStr), "Log level (trace|debug|info|warn|error")
flag.StringVar(&caRootCertFile, "root-ca",
LookupEnvOrString("ROOT_CA", caRootCertFile), "Path to root CA certificate (pubkey)")
flag.StringVar(&mqttCaClientCertFile, "mqtt-client-cert",
LookupEnvOrString("MQTT_CLIENT_CERT", mqttCaClientCertFile), "Path to client cert (pubkey)")
flag.StringVar(&mqttCaClientKeyFile, "mqtt-client-key",
LookupEnvOrString("MQTT_CLIENT_KEY", mqttCaClientKeyFile), "Path to client key (privkey)")
flag.BoolVar(&mqttTls, "mqtt-tls",
LookupEnvOrBool("MQTT_TLS", mqttTls), "Tls (true|false)")
flag.StringVar(&mqttBroker, "mqtt-broker",
LookupEnvOrString("MQTT_BROKER", mqttBroker), "MQTT broker hostname")
flag.IntVar(&mqttPort, "mqtt-port",
LookupEnvOrInt("MQTT_PORT", mqttPort), "Mqtt broker port.")
flag.StringVar(&mqttTopic, "mqtt-topic",
LookupEnvOrString("MQTT_TOPIC", mqttTopic), "MQTT topic to listen to (wildcards ok)")
flag.StringVar(&mqttClientId, "mqtt-client-id",
LookupEnvOrString("MQTT_CLIENT_ID", mqttClientId), "MQTT client id")
flag.StringVar(&kafkaBroker, "kafka-broker",
LookupEnvOrString("KAFKA_BROKER", kafkaBroker), "Kafka broker hostname")
flag.IntVar(&kafkaPort, "kakfa-port",
LookupEnvOrInt("KAFKA_PORT", kafkaPort), "Kafka broker port")
flag.StringVar(&kafkaTopic, "kafka-topic",
LookupEnvOrString("KAFKA_TOPIC", kafkaTopic), "Kafka topic to write to")
flag.IntVar(&kafkaRetryInterval, "kafka-retry-interval",
LookupEnvOrInt("KAFKA_RETRY_INTERVAL", kafkaRetryInterval), "Kafka retry interval in case of failure (seconds)")
flag.IntVar(&healthPort, "health-port",
LookupEnvOrInt("HEALTH_PORT", healthPort), "HTTP port for healthz and prometheus")
flag.IntVar(&kafkaBatchSize, "kafka-batch-size",
LookupEnvOrInt("KAFKA_BATCH_SIZE", kafkaBatchSize), "Kafka batch size")
flag.IntVar(&kafkaMaxBatchSize, "kafka-max-batch-size",
LookupEnvOrInt("KAFKA_MAX_BATCH_SIZE", kafkaMaxBatchSize), "Kafka MAX batch size (used when un-spooling after failure)")
flag.IntVar(&kafkaInterval, "kafka-interval",
LookupEnvOrInt("KAFKA_INTERVAL", kafkaInterval), "Kafka interval. How often a write is triggered (seconds)")
flag.StringVar(&kafkaTestTopic, "kafka-test-topic",
LookupEnvOrString("KAFKA_TEST_TOPIC", kafkaTestTopic), "Initial test message will be sent to this kafka topic. No initial test will be done if this isn't set.")
flag.Parse()
var logLevel log.LogLevel
if logLevelStr != "" {
logLevel, err = log.ParseLogLevel(logLevelStr)
if err != nil {
log.Fatalf("Invalid log level: %s", logLevelStr)
}
log.Infof("Setting log level to %s", logLevel.String())
log.SetLevel(logLevel)
}
// check if MQTT client id is set, if not, use hostname as the ID
if mqttClientId == "" {
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("Error getting hostname: %s", err.Error())
}
// chop off the domain name
hostname = strings.Split(hostname, ".")[0]
mqttClientId = hostname
}
// Check if the client_id contains %h, if so, replace it with the hostname
if strings.Contains(mqttClientId, "%h") {
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("Error getting hostname: %s", err.Error())
}
// chop off the domain name
hostname = strings.Split(hostname, ".")[0]
mqttClientId = strings.Replace(mqttClientId, "%h", hostname, -1)
}
log.Infof("MQTT client id set to %s", mqttClientId)
if mqttTls {
CheckSet(caRootCertFile, "ROOT_CA", "tls is enabled")
CheckSet(mqttCaClientCertFile, "MQTT_CLIENT_CERT", "tls is enabled")
CheckSet(mqttCaClientKeyFile, "MQTT_CLIENT_KEY", "tls is enabled")
}
runConfig := bridge.Params{
MqttBroker: mqttBroker,
MqttPort: mqttPort,
MqttTopic: mqttTopic,
MqttTls: mqttTls,
MqttClientId: mqttClientId,
TlsRootCrtFile: caRootCertFile,
MqttClientCertFile: mqttCaClientCertFile,
MqttClientKeyFile: mqttCaClientKeyFile,
KafkaBroker: kafkaBroker,
KafkaPort: kafkaPort,
KafkaTopic: kafkaTopic,
KafkaRetryInterval: time.Duration(kafkaRetryInterval) * time.Second,
KafkaInterval: time.Duration(kafkaInterval) * time.Second,
KafkaBatchSize: kafkaBatchSize,
KafkaMaxBatchSize: kafkaMaxBatchSize,
HealthPort: healthPort,
KafkaTestTopic: kafkaTestTopic,
LogLevel: logLevel,
}
log.Infof("Startup options: %v", runConfig)
log.Debug("Starting bridge")
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
bridge.Run(ctx, runConfig)
}()
wg.Wait()
log.Debug("Waiting over. Exiting.")
}
func CheckSet(s, name, reason string) {
if s == "" {
log.Fatalf("%s can't be empty when %s", name, reason)
}
}
func LookupEnvOrString(key string, defaultVal string) string {
if val, ok := os.LookupEnv(key); ok {
return val
}
return defaultVal
}
func LookupEnvOrInt(key string, defaultVal int) int {
if val, ok := os.LookupEnv(key); ok {
v, err := strconv.Atoi(val)
if err != nil {
log.Fatalf("LookupEnvOrInt[%s]: %v", key, err)
}
return v
}
return defaultVal
}
func LookupEnvOrBool(key string, defaultVal bool) bool {
if val, ok := os.LookupEnv(key); ok {
return strings.ToUpper(val) == "TRUE"
}
return defaultVal
}