Skip to content

Commit

Permalink
Tracing: Add credentialsSecret for basic authentication to remote end…
Browse files Browse the repository at this point in the history
…point

An additional field credentialsSecret is added to the tracing configmap. The username and password provided in this secret will be used to authenticate against collector endpoint. This allows users to connect to third party opentelemetry compatible collectors.

Signed-off-by: Jayadeep KM <[email protected]>
  • Loading branch information
kmjayadeep committed Oct 22, 2023
1 parent 0111021 commit 8de2c3e
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 49 deletions.
2 changes: 2 additions & 0 deletions config/config-tracing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ data:
# API endpoint to send the traces to
# (optional): The default value is given below
endpoint: "http://jaeger-collector.jaeger.svc.cluster.local:14268/api/traces"
# (optional) Name of the k8s secret which contains basic auth credentials
credentialsSecret: "jaeger-creds"
7 changes: 0 additions & 7 deletions config/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,6 @@ spec:
value: /etc/ssl/certs
- name: METRICS_DOMAIN
value: tekton.dev/pipeline
# The following variables can be uncommented with correct values to enable Jaeger tracing
#- name: OTEL_EXPORTER_JAEGER_ENDPOINT
# value: http://jaeger-collector.jaeger:14268/api/traces
#- name: OTEL_EXPORTER_JAEGER_USER
# value: username
#- name: OTEL_EXPORTER_JAEGER_PASSWORD
# value: password
securityContext:
allowPrivilegeEscalation: false
capabilities:
Expand Down
6 changes: 1 addition & 5 deletions docs/developers/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,4 @@ The configmap `config/config-tracing.yaml` contains the configuration for tracin

* enabled: Set this to true to enable tracing
* endpoint: API endpoint for jaeger collector to send the traces. By default the endpoint is configured to be `http://jaeger-collector.jaeger.svc.cluster.local:14268/api/traces`.

Tekton pipelines controller also supports the following additional environment variables to be able to connect to jaeger:

* `OTEL_EXPORTER_JAEGER_USER` is the username to be sent as authentication to the collector endpoint.
* `OTEL_EXPORTER_JAEGER_PASSWORD` is the password to be sent as authentication to the collector endpoint.
* credentialsSecret: Name of the secret which contains `username` and `password` to authenticate against the endpoint
15 changes: 12 additions & 3 deletions pkg/apis/config/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
// tracingEndpintKey is the configmap key for tracing api endpoint
tracingEndpointKey = "endpoint"

// tracingCredentialsSecretKey is the name of the secret which contains credentials for tracing endpoint
tracingCredentialsSecretKey = "credentialsSecret"

// DefaultEndpoint is the default destination for sending traces
DefaultEndpoint = "http://jaeger-collector.jaeger.svc.cluster.local:14268/api/traces"
)
Expand All @@ -40,8 +43,9 @@ var DefaultTracing, _ = newTracingFromMap(map[string]string{})
// Tracing holds the configurations for tracing
// +k8s:deepcopy-gen=true
type Tracing struct {
Enabled bool
Endpoint string
Enabled bool
Endpoint string
CredentialsSecret string
}

// Equals returns true if two Configs are identical
Expand All @@ -55,7 +59,8 @@ func (cfg *Tracing) Equals(other *Tracing) bool {
}

return other.Enabled == cfg.Enabled &&
other.Endpoint == cfg.Endpoint
other.Endpoint == cfg.Endpoint &&
other.CredentialsSecret == cfg.CredentialsSecret
}

// GetTracingConfigName returns the name of the configmap containing all
Expand All @@ -78,6 +83,10 @@ func newTracingFromMap(config map[string]string) (*Tracing, error) {
t.Endpoint = endpoint
}

if secret, ok := config[tracingCredentialsSecretKey]; ok {
t.CredentialsSecret = secret
}

if enabled, ok := config[tracingEnabledKey]; ok {
e, err := strconv.ParseBool(enabled)
if err != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ import (
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
resolution "github.com/tektoncd/pipeline/pkg/resolution/resource"
"github.com/tektoncd/pipeline/pkg/tracing"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/clock"
kubeclient "knative.dev/pkg/client/injection/kube/client"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -59,9 +61,10 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
pipelineRunInformer := pipelineruninformer.Get(ctx)
resolutionInformer := resolutioninformer.Get(ctx)
verificationpolicyInformer := verificationpolicyinformer.Get(ctx)
tracerProvider := tracing.New(TracerProviderName)
secretinformer := secretinformer.Get(ctx)
tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing"))
//nolint:contextcheck // OnStore methods does not support context as a parameter
configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger), tracerProvider.OnStore(logger))
configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger), tracerProvider.OnStore(secretinformer.Lister()))
configStore.WatchConfigs(cmw)

c := &Reconciler{
Expand All @@ -86,6 +89,17 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
}
})

if _, err := secretinformer.Informer().AddEventHandler(controller.HandleAll(func(obj interface{}) {
secret, ok := obj.(*corev1.Secret)
if !ok {
logger.Error("Failed to do type assertion for Secret")
return
}
tracerProvider.OnSecret(secret)
})); err != nil {
logging.FromContext(ctx).Panicf("Couldn't register Secret informer event handler: %w", err)
}

if _, err := pipelineRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil {
logging.FromContext(ctx).Panicf("Couldn't register PipelineRun informer event handler: %w", err)
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ import (
"github.com/tektoncd/pipeline/pkg/spire"
"github.com/tektoncd/pipeline/pkg/taskrunmetrics"
"github.com/tektoncd/pipeline/pkg/tracing"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/clock"
kubeclient "knative.dev/pkg/client/injection/kube/client"
limitrangeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/limitrange"
filteredpodinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/logging"
)

Expand All @@ -61,10 +63,11 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
limitrangeInformer := limitrangeinformer.Get(ctx)
verificationpolicyInformer := verificationpolicyinformer.Get(ctx)
resolutionInformer := resolutioninformer.Get(ctx)
secretinformer := secretinformer.Get(ctx)
spireClient := spire.GetControllerAPIClient(ctx)
tracerProvider := tracing.New(TracerProviderName)
tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing"))
//nolint:contextcheck // OnStore methods does not support context as a parameter
configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger), spire.OnStore(ctx, logger), tracerProvider.OnStore(logger))
configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger), spire.OnStore(ctx, logger), tracerProvider.OnStore(secretinformer.Lister()))
configStore.WatchConfigs(cmw)

entrypointCache, err := pod.NewEntrypointCache(kubeclientset)
Expand Down Expand Up @@ -96,6 +99,17 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
}
})

if _, err := secretinformer.Informer().AddEventHandler(controller.HandleAll(func(obj interface{}) {
secret, ok := obj.(*corev1.Secret)
if !ok {
logger.Error("Failed to do type assertion for Secret")
return
}
tracerProvider.OnSecret(secret)
})); err != nil {
logging.FromContext(ctx).Panicf("Couldn't register Secret informer event handler: %w", err)
}

if _, err := taskRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil {
logging.FromContext(ctx).Panicf("Couldn't register TaskRun informer event handler: %w", err)
}
Expand Down
96 changes: 73 additions & 23 deletions pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,69 +28,119 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"knative.dev/pkg/system"
)

type tracerProvider struct {
service string
provider trace.TracerProvider
cfg *config.Tracing
username string
password string
logger *zap.SugaredLogger
}

func init() {
otel.SetTextMapPropagator(propagation.TraceContext{})
}

// New returns a new instance of tracerProvider for the given service
func New(service string) *tracerProvider {
func New(service string, logger *zap.SugaredLogger) *tracerProvider {
return &tracerProvider{
service: service,
provider: trace.NewNoopTracerProvider(),
logger: logger,
}
}

// OnStore configures tracerProvider dynamically
func (t *tracerProvider) OnStore(logger *zap.SugaredLogger) func(name string, value interface{}) {
func (t *tracerProvider) OnStore(lister listerv1.SecretLister) func(name string, value interface{}) {
return func(name string, value interface{}) {
if name == config.GetTracingConfigName() {
cfg, ok := value.(*config.Tracing)
if !ok {
logger.Error("Failed to do type assertion for extracting TRACING config")
return
}
if name != config.GetTracingConfigName() {
return
}

if cfg.Equals(t.cfg) {
logger.Info("Tracing config unchanged", cfg, t.cfg)
return
}
t.cfg = cfg
cfg, ok := value.(*config.Tracing)
if !ok {
t.logger.Error("tracing configmap is in invalid format. value: %v", value)
return
}

tp, err := createTracerProvider(t.service, cfg)
if cfg.Equals(t.cfg) {
t.logger.Info("tracing config unchanged", cfg, t.cfg)
return
}
t.cfg = cfg

if lister != nil && cfg.CredentialsSecret != "" {
sec, err := lister.Secrets(system.Namespace()).Get(cfg.CredentialsSecret)
if err != nil {
logger.Errorf("Unable to initialize tracing with error : %v", err.Error())
t.logger.Errorf("unable to initialize tracing with error : %v", err.Error())
return
}
logger.Info("Initialized Tracer Provider")
if p, ok := t.provider.(*tracesdk.TracerProvider); ok {
if err := p.Shutdown(context.Background()); err != nil {
logger.Errorf("Unable to shutdown tracingprovider with error : %v", err.Error())
}
}
t.provider = tp
creds := sec.Data
t.username = string(creds["username"])
t.password = string(creds["password"])
} else {
t.username = ""
t.password = ""
}

t.reinitialize()
}
}

func (t *tracerProvider) Tracer(name string, options ...trace.TracerOption) trace.Tracer {
return t.provider.Tracer(name, options...)
}

func createTracerProvider(service string, cfg *config.Tracing) (trace.TracerProvider, error) {
func (t *tracerProvider) OnSecret(secret *corev1.Secret) {
if secret.Name != t.cfg.CredentialsSecret {
return
}

creds := secret.Data
username := string(creds["username"])
password := string(creds["password"])

if t.username == username && t.password == password {
// No change in credentials, no need to reinitialize
return
}
t.username = username
t.password = password

t.logger.Debugf("tracing credentials updated, reinitializing tracingprovider with secret: %v", secret.Name)

t.reinitialize()
}

func (t *tracerProvider) reinitialize() {
tp, err := createTracerProvider(t.service, t.cfg, t.username, t.password)
if err != nil {
t.logger.Errorf("unable to initialize tracing with error : %v", err.Error())
return
}
t.logger.Info("initialized Tracer Provider")
if p, ok := t.provider.(*tracesdk.TracerProvider); ok {
if err := p.Shutdown(context.Background()); err != nil {
t.logger.Errorf("unable to shutdown tracingprovider with error : %v", err.Error())
}
}
t.provider = tp
}

func createTracerProvider(service string, cfg *config.Tracing, user, pass string) (trace.TracerProvider, error) {
if !cfg.Enabled {
return trace.NewNoopTracerProvider(), nil
}

exp, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint(cfg.Endpoint),
jaeger.WithUsername(user),
jaeger.WithPassword(pass),
))
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 8de2c3e

Please sign in to comment.