Skip to content

Commit

Permalink
feat(GPX-669): Add TLS support for datasources (#66)
Browse files Browse the repository at this point in the history
+ added support for loading certificates for loki and thanos
+ restructured configmap to enhance readability and ease of use
  • Loading branch information
Lucostus authored Jul 17, 2023
1 parent 0452bf9 commit af2fc54
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 79 deletions.
49 changes: 33 additions & 16 deletions configs/config.yaml
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
proxy:
log_level: INFO
provider: configmap
thanos_url: https://localhost:3000
loki_url: https://localhost:3000
jwks_cert_url: https://sso.example.com/realms/internal/protocol/openid-connect/certs
admin_group: gepardec-run-admins
insecure_skip_verify: false
log:
level: DEBUG
log_tokens: false
port: 8080

tenant_provider: configmap

web:
proxy_port: 8080
metrics_port: 8081
host: localhost
tenant_labels:
thanos: namespace
loki: kubernetes_namespace_name
insecure_skip_verify: true
trusted_root_ca_path: "./certs/"
jwks_cert_url: https://example.com/realms/internal/protocol/openid-connect/certs

admin:
bypass: true
group: gepardec-run-admins

dev:
enabled: false
username: example
service_account_token: "fake"

db:
enabled: false
user: multitenant
password_path: "."
host: localhost
port: 3306
dbName: example
query: "SELECT * FROM users WHERE username = ?"
dev:
enabled: false
username: example
service_account_token: "fake"

thanos:
url: https://localhost:9091
tenant_label: namespace
cert: "./certs/thanos/tls.crt"
key: "./certs/thanos/tls.key"

loki:
url: https://localhost:3100
tenant_label: kubernetes_namespace_name
cert: "./certs/loki/tls.crt"
key: "./certs/loki/tls.key"
4 changes: 2 additions & 2 deletions enforcer_logql.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func logqlEnforcer(query string, tenantLabels map[string]bool) (string, error) {
func matchNamespaceMatchers(queryMatches []*labels.Matcher, tenantLabels map[string]bool) ([]*labels.Matcher, error) {
foundNamespace := false
for _, match := range queryMatches {
if match.Name == Cfg.Proxy.TenantLabels.Loki {
if match.Name == Cfg.Loki.TenantLabel {
foundNamespace = true
queryLabels := strings.Split(match.Value, "|")
for _, queryLabel := range queryLabels {
Expand All @@ -78,7 +78,7 @@ func matchNamespaceMatchers(queryMatches []*labels.Matcher, tenantLabels map[str

queryMatches = append(queryMatches, &labels.Matcher{
Type: matchType,
Name: Cfg.Proxy.TenantLabels.Loki,
Name: Cfg.Loki.TenantLabel,
Value: strings.Join(MapKeysToArray(tenantLabels), "|"),
})
}
Expand Down
8 changes: 4 additions & 4 deletions enforcer_promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func promqlEnforcer(query string, allowedTenantLabels map[string]bool) (string,
operator = "=~"
}
query = fmt.Sprintf("{%s%s\"%s\"}",
Cfg.Proxy.TenantLabels.Thanos,
Cfg.Thanos.TenantLabel,
operator,
strings.Join(MapKeysToArray(allowedTenantLabels),
"|"))
Expand Down Expand Up @@ -94,7 +94,7 @@ func extractLabelsAndValues(expr parser.Expr) (map[string]string, error) {
// against allowed tenant labels. If the check fails, it returns an error. If no label matches
// the Thanos tenant label, it returns all allowed tenant labels.
func enforceLabels(queryLabels map[string]string, allowedTenantLabels map[string]bool) ([]string, error) {
if _, ok := queryLabels[Cfg.Proxy.TenantLabels.Thanos]; ok {
if _, ok := queryLabels[Cfg.Thanos.TenantLabel]; ok {
ok, tenantLabels := checkLabels(queryLabels, allowedTenantLabels)
if !ok {
return nil, fmt.Errorf("user not allowed with namespace %s", tenantLabels[0])
Expand All @@ -109,7 +109,7 @@ func enforceLabels(queryLabels map[string]string, allowedTenantLabels map[string
// in allowed tenant labels, it returns false along with the query label. If all query labels exist
// in allowed tenant labels, it returns true along with the query labels.
func checkLabels(queryLabels map[string]string, allowedTenantLabels map[string]bool) (bool, []string) {
splitQueryLabels := strings.Split(queryLabels[Cfg.Proxy.TenantLabels.Thanos], "|")
splitQueryLabels := strings.Split(queryLabels[Cfg.Thanos.TenantLabel], "|")
for _, queryLabel := range splitQueryLabels {
_, ok := allowedTenantLabels[queryLabel]
if !ok {
Expand All @@ -132,7 +132,7 @@ func createEnforcer(tenantLabels []string) *enforcer.Enforcer {
}

return enforcer.NewEnforcer(true, &labels.Matcher{
Name: Cfg.Proxy.TenantLabels.Thanos,
Name: Cfg.Thanos.TenantLabel,
Type: matchType,
Value: strings.Join(tenantLabels, "|"),
})
Expand Down
101 changes: 80 additions & 21 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"crypto/tls"
"crypto/x509"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -12,6 +13,7 @@ import (
"go.uber.org/zap"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"time"
Expand All @@ -31,42 +33,38 @@ var (
// configures the HTTP client to ignore self-signed certificates, reads the service account token,
// initializes JWKS if not in development mode, and establishes a database connection if enabled in the config.
func init() {
InitConfig()
InitLogging()
initConfig()
initLogging()
Logger.Info("-------Init Proxy-------")
Logger.Info("Commit: ", zap.String("commit", Commit))
Logger.Info("Set http client to ignore self signed certificates")
Logger.Info("Config ", zap.Any("cfg", Cfg))
initTLSConfig()
ServiceAccountToken = Cfg.Dev.ServiceAccountToken
if !strings.HasSuffix(os.Args[0], ".test") {
Logger.Debug("Not in test mode")
InitJWKS()
initJWKS()
if !Cfg.Dev.Enabled {
sa, err := os.ReadFile("/run/secrets/kubernetes.io/serviceaccount/token")
if err != nil {
Logger.Panic("Error while reading service account token", zap.Error(err))
}
ServiceAccountToken = string(sa)
}

}

if Cfg.Db.Enabled {
InitDB()
}

if Cfg.Proxy.InsecureSkipVerify {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
initDB()
}
Logger.Info("------Init Complete------")
}

// InitConfig initializes the configuration from the files `config` and `labels` using Viper.
func InitConfig() {
// initConfig initializes the configuration from the files `config` and `labels` using Viper.
func initConfig() {
Cfg = &Config{}
V = viper.NewWithOptions(viper.KeyDelimiter("::"))
loadConfig("config")
if Cfg.Proxy.Provider == "configmap" {
if Cfg.TenantProvider == "configmap" {
loadConfig("labels")
}
}
Expand All @@ -77,7 +75,7 @@ func onConfigChange(e fsnotify.Event) {
//Todo: change log level on reload
Cfg = &Config{}
var configs []string
if Cfg.Proxy.Provider == "configmap" {
if Cfg.TenantProvider == "configmap" {
configs = []string{"config", "labels"}
} else {
configs = []string{"config"}
Expand All @@ -96,6 +94,8 @@ func onConfigChange(e fsnotify.Event) {
}
fmt.Printf("{\"level\":\"info\",\"config\":\"%+v/\"}", Cfg)
fmt.Printf("{\"level\":\"info\",\"message\":\"Config file changed: %s/\"}", e.Name)
initTLSConfig()
initJWKS()
}

// loadConfig loads the configuration from the specified file. It looks for the config file
Expand All @@ -118,10 +118,10 @@ func loadConfig(configName string) {
V.WatchConfig()
}

// InitLogging initializes the logger based on the log level specified in the config file.
func InitLogging() *zap.Logger {
// initLogging initializes the logger based on the log level specified in the config file.
func initLogging() *zap.Logger {
rawJSON := []byte(`{
"level": "` + strings.ToLower(Cfg.Proxy.LogLevel) + `",
"level": "` + strings.ToLower(Cfg.Log.Level) + `",
"encoding": "json",
"outputPaths": ["stdout"],
"errorOutputPaths": ["stdout"],
Expand All @@ -145,11 +145,70 @@ func InitLogging() *zap.Logger {
return Logger
}

// InitJWKS initializes the JWKS (JSON Web Key Set) from a specified URL. It sets up the refresh parameters
func initTLSConfig() {
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}

if Cfg.Web.TrustedRootCaPath != "" {
err := filepath.Walk(Cfg.Web.TrustedRootCaPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() || strings.Contains(info.Name(), "..") {
return nil
}

certs, err := os.ReadFile(path)
if err != nil {
Logger.Error("Error while reading trusted CA", zap.Error(err))
return err
}
Logger.Debug("Adding trusted CA", zap.String("path", path))
certs = append(certs, []byte("\n")...)
rootCAs.AppendCertsFromPEM(certs)

return nil
})

if err != nil {
Logger.Error("Error while traversing directory", zap.Error(err))
}
}

var certificates []tls.Certificate

lokiCert, err := tls.LoadX509KeyPair(Cfg.Loki.Cert, Cfg.Loki.Key)
if err != nil {
Logger.Error("Error while loading loki certificate", zap.Error(err))
} else {
Logger.Debug("Adding Loki certificate", zap.String("path", Cfg.Loki.Cert))
certificates = append(certificates, lokiCert)
}

thanosCert, err := tls.LoadX509KeyPair(Cfg.Thanos.Cert, Cfg.Thanos.Key)
if err != nil {
Logger.Error("Error while loading thanos certificate", zap.Error(err))
} else {
Logger.Debug("Adding Thanos certificate", zap.String("path", Cfg.Loki.Cert))
certificates = append(certificates, thanosCert)
}

config := &tls.Config{
InsecureSkipVerify: Cfg.Web.InsecureSkipVerify,
RootCAs: rootCAs,
Certificates: certificates,
}

http.DefaultTransport.(*http.Transport).TLSClientConfig = config
}

// initJWKS initializes the JWKS (JSON Web Key Set) from a specified URL. It sets up the refresh parameters
// for the JWKS and handles any errors that occur during the refresh.
func InitJWKS() {
func initJWKS() {
Logger.Info("Init Keycloak config")
jwksURL := Cfg.Proxy.JwksCertURL
jwksURL := Cfg.Web.JwksCertURL

options := keyfunc.Options{
RefreshErrorHandler: func(err error) {
Expand All @@ -172,10 +231,10 @@ func InitJWKS() {
Logger.Info("Finished Keycloak config")
}

// InitDB establishes a connection to the database if the `Db.Enabled` configuration setting is `true`.
// initDB establishes a connection to the database if the `Db.Enabled` configuration setting is `true`.
// It reads the database password from a file, sets up the database connection configuration,
// and opens the database connection.
func InitDB() {
func initDB() {
password, err := os.ReadFile(Cfg.Db.PasswordPath)
if err != nil {
Logger.Panic("Could not read db password", zap.Error(err))
Expand Down
21 changes: 10 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func main() {
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))

go func() {
if err := http.ListenAndServe(fmt.Sprintf("%s:%d", Cfg.Proxy.Host, Cfg.Proxy.MetricsPort), mux); err != nil {
if err := http.ListenAndServe(fmt.Sprintf("%s:%d", Cfg.Web.Host, Cfg.Web.MetricsPort), mux); err != nil {
Logger.Panic("Error while serving metrics", zap.Error(err))
}
}()
err := http.ListenAndServe(fmt.Sprintf("%s:%d", Cfg.Proxy.Host, Cfg.Proxy.Port),
err := http.ListenAndServe(fmt.Sprintf("%s:%d", Cfg.Web.Host, Cfg.Web.ProxyPort),
std.Handler("/", mdlw, http.HandlerFunc(reverseProxy)))

if err != nil {
Expand Down Expand Up @@ -81,12 +81,12 @@ func reverseProxy(rw http.ResponseWriter, req *http.Request) {
}
query := req.URL.Query().Get(urlKey)

upstreamUrl, err = url.Parse(Cfg.Proxy.ThanosUrl)
upstreamUrl, err = url.Parse(Cfg.Thanos.URL)
enforceFunc = promqlEnforcer
Logger.Debug("Parsed Thanos URL")

if containsLoki(req.URL.Path) {
upstreamUrl, err = url.Parse(Cfg.Proxy.LokiUrl)
upstreamUrl, err = url.Parse(Cfg.Loki.URL)
enforceFunc = logqlEnforcer
Logger.Debug("Parsed Loki URL")
}
Expand All @@ -97,7 +97,6 @@ func reverseProxy(rw http.ResponseWriter, req *http.Request) {
}

logRequest(req)
Logger.Debug("url request", zap.String("url", req.URL.String()))

if !hasAuthorizationHeader(req) {
logAndWriteErrorMsg(rw, "No Authorization header found", http.StatusForbidden, nil)
Expand Down Expand Up @@ -133,7 +132,7 @@ func reverseProxy(rw http.ResponseWriter, req *http.Request) {
Logger.Debug("Development mode enabled, set preferred username")
}

switch provider := Cfg.Proxy.Provider; provider {
switch provider := Cfg.TenantProvider; provider {
case "mysql":
tenantLabels = GetLabelsFromDB(keycloakToken.Email)
Logger.Debug("Fetched labels from MySQL")
Expand Down Expand Up @@ -217,7 +216,7 @@ func isValidToken(token *jwt.Token) bool {

// isAdminSkip checks if a user belongs to the admin group. It can bypass some checks for admin users.
func isAdminSkip(token KeycloakToken) bool {
return ContainsIgnoreCase(token.Groups, Cfg.Proxy.AdminGroup) || ContainsIgnoreCase(token.ApaGroupsOrg, Cfg.Proxy.AdminGroup)
return (ContainsIgnoreCase(token.Groups, Cfg.Admin.Group) || ContainsIgnoreCase(token.ApaGroupsOrg, Cfg.Admin.Group)) && Cfg.Admin.Bypass
}

func containsApiV1Labels(s string) bool {
Expand Down Expand Up @@ -251,7 +250,7 @@ func logRequest(req *http.Request) {

// Restore the io.ReadCloser to its original state
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
if !Cfg.Proxy.LogTokens {
if !Cfg.Log.LogTokens {
bodyBytes = []byte("[REDACTED]")
}

Expand All @@ -267,14 +266,14 @@ func logRequest(req *http.Request) {
Body: string(bodyBytes),
}

if !Cfg.Proxy.LogTokens {
// Make a copy of the header map so we're not modifying the original
if !Cfg.Log.LogTokens {
copyHeader := make(http.Header)
for k, v := range requestData.Header {
copyHeader[k] = v
}
copyHeader.Del("Authorization")
copyHeader.Del("X-Plugin-Id")
copyHeader.Del("X-Id-Token")
requestData.Header = copyHeader
}

Expand All @@ -283,7 +282,7 @@ func logRequest(req *http.Request) {
Logger.Error("Error while marshalling request", zap.Error(err))
return
}
Logger.Debug("Request", zap.String("request", string(jsonData)))
Logger.Debug("Request", zap.String("request", string(jsonData)), zap.String("path", req.URL.Path))
}

// parseJwtToken parses a JWT token string into a Keycloak token and a JWT token. It returns an error if parsing fails.
Expand Down
Loading

0 comments on commit af2fc54

Please sign in to comment.