forked from project-flogo/contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection.go
147 lines (121 loc) · 4.36 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package kafka
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"github.com/Shopify/sarama"
"github.com/project-flogo/core/support/log"
)
type KafkaConnection struct {
kafkaConfig *sarama.Config
brokers []string
consumer sarama.Consumer
}
func (c *KafkaConnection) Connection() sarama.Consumer {
return c.consumer
}
func (c *KafkaConnection) Stop() error {
return c.consumer.Close()
}
func getKafkaConnection(logger log.Logger, settings *Settings) (*KafkaConnection, error) {
newConn := &KafkaConnection{}
newConn.kafkaConfig = sarama.NewConfig()
brokerUrls := strings.Split(settings.BrokerUrls, ",")
if len(brokerUrls) < 1 {
return nil, fmt.Errorf("BrokerUrl [%s] is invalid, require at least one broker", settings.BrokerUrls)
}
brokers := make([]string, len(brokerUrls))
for brokerNo, broker := range brokerUrls {
err := validateBrokerUrl(broker)
if err != nil {
return nil, fmt.Errorf("BrokerUrl [%s] format invalid for reason: [%v]", broker, err)
}
brokers[brokerNo] = broker
}
newConn.brokers = brokers
logger.Debugf("Kafka brokers: [%v]", brokers)
//clientKeystore
/*
Its worth mentioning here that when the keystore for kafka is created it must support RSA keys via
the -keyalg RSA option. If not then there will be ZERO overlap in supported cipher suites with java.
see: https://issues.apache.org/jira/browse/KAFKA-3647
for more info
*/
if settings.TrustStore != "" {
if trustPool, err := getCerts(logger, settings.TrustStore); err == nil {
config := tls.Config{
RootCAs: trustPool,
InsecureSkipVerify: true}
newConn.kafkaConfig.Net.TLS.Enable = true
newConn.kafkaConfig.Net.TLS.Config = &config
logger.Debugf("Kafka initialized truststore from [%v]", settings.TrustStore)
} else {
return nil, err
}
}
// SASL
if settings.User != "" {
if len(settings.Password) == 0 {
return nil, fmt.Errorf("password not provided for user: %s", settings.User)
}
newConn.kafkaConfig.Net.SASL.Enable = true
newConn.kafkaConfig.Net.SASL.User = settings.User
newConn.kafkaConfig.Net.SASL.Password = settings.Password
logger.Debugf("Kafka SASL params initialized; user [%v]", settings.User)
}
kafkaConsumer, err := sarama.NewConsumer(brokers, newConn.kafkaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka consumer for reason [%s]", err)
}
newConn.consumer = kafkaConsumer
return newConn, nil
}
// validateBrokerUrl ensures that this string meets the host:port definition of a kafka host spec
// Kafka calls it a url but its really just host:port, which for numeric ip addresses is not a valid URI
// technically speaking.
func validateBrokerUrl(broker string) error {
hostPort := strings.Split(broker, ":")
if len(hostPort) != 2 {
return fmt.Errorf("BrokerUrl must be composed of sections like \"host:port\"")
}
i, err := strconv.Atoi(hostPort[1])
if err != nil || i < 0 || i > 32767 {
return fmt.Errorf("port specification [%s] is not numeric and between 0 and 32767", hostPort[1])
}
return nil
}
func getCerts(logger log.Logger, trustStore string) (*x509.CertPool, error) {
certPool := x509.NewCertPool()
fileInfo, err := os.Stat(trustStore)
if err != nil {
return certPool, fmt.Errorf("Truststore [%s] does not exist", trustStore)
}
switch mode := fileInfo.Mode(); {
case mode.IsDir():
break
case mode.IsRegular():
return certPool, fmt.Errorf("TrustStore [%s] is not a directory. Must be a directory containing trusted certificates in PEM format",
trustStore)
}
trustedCertFiles, err := ioutil.ReadDir(trustStore)
if err != nil || len(trustedCertFiles) == 0 {
return certPool, fmt.Errorf("failed to read trusted certificates from [%s] Must be a directory containing trusted certificates in PEM format", trustStore)
}
for _, trustCertFile := range trustedCertFiles {
fqfName := fmt.Sprintf("%s%c%s", trustStore, os.PathSeparator, trustCertFile.Name())
trustCertBytes, err := ioutil.ReadFile(fqfName)
if err != nil {
logger.Warnf("Failed to read trusted certificate [%s] ... continuing", trustCertFile.Name())
} else if trustCertBytes != nil {
certPool.AppendCertsFromPEM(trustCertBytes)
}
}
if len(certPool.Subjects()) < 1 {
return certPool, fmt.Errorf("failed to read trusted certificates from [%s] After processing all files in the directory no valid trusted certs were found", trustStore)
}
return certPool, nil
}