Skip to content

Commit

Permalink
Tracing: Add credentialsSecret for basic authentication to remote end…
Browse files Browse the repository at this point in the history
…point

An additional field credentialsSecret is added to the tracing configmap. The username and password provided in this secret will be used to authenticate against collector endpoint. This allows users to connect to third party opentelemetry compatible collectors.

Signed-off-by: Jayadeep KM <[email protected]>
  • Loading branch information
kmjayadeep committed Nov 30, 2023
1 parent 30540fc commit 7fc5fd1
Show file tree
Hide file tree
Showing 14 changed files with 480 additions and 50 deletions.
2 changes: 2 additions & 0 deletions config/config-tracing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ data:
# API endpoint to send the traces to
# (optional): The default value is given below
endpoint: "http://jaeger-collector.jaeger.svc.cluster.local:14268/api/traces"
# (optional) Name of the k8s secret which contains basic auth credentials
credentialsSecret: "jaeger-creds"
7 changes: 0 additions & 7 deletions config/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,6 @@ spec:
value: /etc/ssl/certs
- name: METRICS_DOMAIN
value: tekton.dev/pipeline
# The following variables can be uncommented with correct values to enable Jaeger tracing
#- name: OTEL_EXPORTER_JAEGER_ENDPOINT
# value: http://jaeger-collector.jaeger:14268/api/traces
#- name: OTEL_EXPORTER_JAEGER_USER
# value: username
#- name: OTEL_EXPORTER_JAEGER_PASSWORD
# value: password
securityContext:
allowPrivilegeEscalation: false
capabilities:
Expand Down
6 changes: 1 addition & 5 deletions docs/developers/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,4 @@ The configmap `config/config-tracing.yaml` contains the configuration for tracin

* enabled: Set this to true to enable tracing
* endpoint: API endpoint for jaeger collector to send the traces. By default the endpoint is configured to be `http://jaeger-collector.jaeger.svc.cluster.local:14268/api/traces`.

Tekton pipelines controller also supports the following additional environment variables to be able to connect to jaeger:

* `OTEL_EXPORTER_JAEGER_USER` is the username to be sent as authentication to the collector endpoint.
* `OTEL_EXPORTER_JAEGER_PASSWORD` is the password to be sent as authentication to the collector endpoint.
* credentialsSecret: Name of the secret which contains `username` and `password` to authenticate against the endpoint
15 changes: 12 additions & 3 deletions pkg/apis/config/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
// tracingEndpintKey is the configmap key for tracing api endpoint
tracingEndpointKey = "endpoint"

// tracingCredentialsSecretKey is the name of the secret which contains credentials for tracing endpoint
tracingCredentialsSecretKey = "credentialsSecret"

// DefaultEndpoint is the default destination for sending traces
DefaultEndpoint = "http://jaeger-collector.jaeger.svc.cluster.local:14268/api/traces"
)
Expand All @@ -40,8 +43,9 @@ var DefaultTracing, _ = newTracingFromMap(map[string]string{})
// Tracing holds the configurations for tracing
// +k8s:deepcopy-gen=true
type Tracing struct {
Enabled bool
Endpoint string
Enabled bool
Endpoint string
CredentialsSecret string
}

// Equals returns true if two Configs are identical
Expand All @@ -55,7 +59,8 @@ func (cfg *Tracing) Equals(other *Tracing) bool {
}

return other.Enabled == cfg.Enabled &&
other.Endpoint == cfg.Endpoint
other.Endpoint == cfg.Endpoint &&
other.CredentialsSecret == cfg.CredentialsSecret
}

// GetTracingConfigName returns the name of the configmap containing all
Expand All @@ -78,6 +83,10 @@ func newTracingFromMap(config map[string]string) (*Tracing, error) {
t.Endpoint = endpoint
}

if secret, ok := config[tracingCredentialsSecretKey]; ok {
t.CredentialsSecret = secret
}

if enabled, ok := config[tracingEnabledKey]; ok {
e, err := strconv.ParseBool(enabled)
if err != nil {
Expand Down
87 changes: 87 additions & 0 deletions pkg/apis/config/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,90 @@ func TestNewTracingFromConfigMap(t *testing.T) {
})
}
}

func TestTracingEquals(t *testing.T) {
testCases := []struct {
name string
left *config.Tracing
right *config.Tracing
expected bool
}{
{
name: "left and right nil",
left: nil,
right: nil,
expected: true,
},
{
name: "left nil",
left: nil,
right: &config.Tracing{},
expected: false,
},
{
name: "right nil",
left: &config.Tracing{},
right: nil,
expected: false,
},
{
name: "right and right default",
left: &config.Tracing{},
right: &config.Tracing{},
expected: true,
},
{
name: "different enabled",
left: &config.Tracing{
Enabled: true,
},
right: &config.Tracing{
Enabled: false,
},
expected: false,
},
{
name: "different endpoint",
left: &config.Tracing{
Endpoint: "a",
},
right: &config.Tracing{
Endpoint: "b",
},
expected: false,
},
{
name: "different credentialsSecret",
left: &config.Tracing{
CredentialsSecret: "a",
},
right: &config.Tracing{
CredentialsSecret: "b",
},
expected: false,
},
{
name: "same all fields",
left: &config.Tracing{
Enabled: true,
Endpoint: "a",
CredentialsSecret: "b",
},
right: &config.Tracing{
Enabled: true,
Endpoint: "a",
CredentialsSecret: "b",
},
expected: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := tc.left.Equals(tc.right)
if actual != tc.expected {
t.Errorf("Comparison failed expected: %t, actual: %t", tc.expected, actual)
}
})
}
}
10 changes: 8 additions & 2 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/utils/clock"
kubeclient "knative.dev/pkg/client/injection/kube/client"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -59,9 +60,10 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
pipelineRunInformer := pipelineruninformer.Get(ctx)
resolutionInformer := resolutioninformer.Get(ctx)
verificationpolicyInformer := verificationpolicyinformer.Get(ctx)
tracerProvider := tracing.New(TracerProviderName)
secretinformer := secretinformer.Get(ctx)
tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing"))
//nolint:contextcheck // OnStore methods does not support context as a parameter
configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger), tracerProvider.OnStore(logger))
configStore := config.NewStore(logger.Named("config-store"), pipelinerunmetrics.MetricsOnStore(logger), tracerProvider.OnStore(secretinformer.Lister()))
configStore.WatchConfigs(cmw)

c := &Reconciler{
Expand All @@ -86,6 +88,10 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
}
})

if _, err := secretinformer.Informer().AddEventHandler(controller.HandleAll(tracerProvider.Handler)); err != nil {
logging.FromContext(ctx).Panicf("Couldn't register Secret informer event handler: %w", err)
}

if _, err := pipelineRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil {
logging.FromContext(ctx).Panicf("Couldn't register PipelineRun informer event handler: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16735,7 +16735,7 @@ func Test_runNextSchedulableTask(t *testing.T) {
Clock: clock.NewFakePassiveClock(time.Now()),
KubeClientSet: testAssets.Clients.Kube,
PipelineClientSet: testAssets.Clients.Pipeline,
tracerProvider: tracing.New("pipelinerun"),
tracerProvider: tracing.New("pipelinerun", logging.FromContext(ctx)),
}
err := c.runNextSchedulableTask(ctx, tc.pr, tc.pipelineRunFacts)
if (err != nil) != tc.wantErr {
Expand Down
10 changes: 8 additions & 2 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
kubeclient "knative.dev/pkg/client/injection/kube/client"
limitrangeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/limitrange"
filteredpodinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -61,10 +62,11 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
limitrangeInformer := limitrangeinformer.Get(ctx)
verificationpolicyInformer := verificationpolicyinformer.Get(ctx)
resolutionInformer := resolutioninformer.Get(ctx)
secretinformer := secretinformer.Get(ctx)
spireClient := spire.GetControllerAPIClient(ctx)
tracerProvider := tracing.New(TracerProviderName)
tracerProvider := tracing.New(TracerProviderName, logger.Named("tracing"))
//nolint:contextcheck // OnStore methods does not support context as a parameter
configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger), spire.OnStore(ctx, logger), tracerProvider.OnStore(logger))
configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger), spire.OnStore(ctx, logger), tracerProvider.OnStore(secretinformer.Lister()))
configStore.WatchConfigs(cmw)

entrypointCache, err := pod.NewEntrypointCache(kubeclientset)
Expand Down Expand Up @@ -96,6 +98,10 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
}
})

if _, err := secretinformer.Informer().AddEventHandler(controller.HandleAll(tracerProvider.Handler)); err != nil {
logging.FromContext(ctx).Panicf("Couldn't register Secret informer event handler: %w", err)
}

if _, err := taskRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)); err != nil {
logging.FromContext(ctx).Panicf("Couldn't register TaskRun informer event handler: %w", err)
}
Expand Down
106 changes: 83 additions & 23 deletions pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,69 +28,129 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"knative.dev/pkg/system"
)

type tracerProvider struct {
service string
provider trace.TracerProvider
cfg *config.Tracing
username string
password string
logger *zap.SugaredLogger
}

func init() {
otel.SetTextMapPropagator(propagation.TraceContext{})
}

// New returns a new instance of tracerProvider for the given service
func New(service string) *tracerProvider {
func New(service string, logger *zap.SugaredLogger) *tracerProvider {
return &tracerProvider{
service: service,
provider: trace.NewNoopTracerProvider(),
logger: logger,
}
}

// OnStore configures tracerProvider dynamically
func (t *tracerProvider) OnStore(logger *zap.SugaredLogger) func(name string, value interface{}) {
func (t *tracerProvider) OnStore(lister listerv1.SecretLister) func(name string, value interface{}) {
return func(name string, value interface{}) {
if name == config.GetTracingConfigName() {
cfg, ok := value.(*config.Tracing)
if !ok {
logger.Error("Failed to do type assertion for extracting TRACING config")
return
}
if name != config.GetTracingConfigName() {
return
}

if cfg.Equals(t.cfg) {
logger.Info("Tracing config unchanged", cfg, t.cfg)
return
}
t.cfg = cfg
cfg, ok := value.(*config.Tracing)
if !ok {
t.logger.Error("tracing configmap is in invalid format. value: %v", value)
return
}

if cfg.Equals(t.cfg) {
t.logger.Info("tracing config unchanged", cfg, t.cfg)
return
}
t.cfg = cfg

tp, err := createTracerProvider(t.service, cfg)
if lister != nil && cfg.CredentialsSecret != "" {
sec, err := lister.Secrets(system.Namespace()).Get(cfg.CredentialsSecret)
if err != nil {
logger.Errorf("Unable to initialize tracing with error : %v", err.Error())
t.logger.Errorf("unable to initialize tracing with error : %v", err.Error())
return
}
logger.Info("Initialized Tracer Provider")
if p, ok := t.provider.(*tracesdk.TracerProvider); ok {
if err := p.Shutdown(context.Background()); err != nil {
logger.Errorf("Unable to shutdown tracingprovider with error : %v", err.Error())
}
}
t.provider = tp
creds := sec.Data
t.username = string(creds["username"])
t.password = string(creds["password"])
} else {
t.username = ""
t.password = ""
}

t.reinitialize()
}
}

func (t *tracerProvider) Tracer(name string, options ...trace.TracerOption) trace.Tracer {
return t.provider.Tracer(name, options...)
}

func createTracerProvider(service string, cfg *config.Tracing) (trace.TracerProvider, error) {
// Handler is called by the informer when the secret is updated
func (t *tracerProvider) Handler(obj interface{}) {
secret, ok := obj.(*corev1.Secret)
if !ok {
t.logger.Error("Failed to do type assertion for Secret")
return
}
t.OnSecret(secret)
}

func (t *tracerProvider) OnSecret(secret *corev1.Secret) {
if secret.Name != t.cfg.CredentialsSecret {
return
}

creds := secret.Data
username := string(creds["username"])
password := string(creds["password"])

if t.username == username && t.password == password {
// No change in credentials, no need to reinitialize
return
}
t.username = username
t.password = password

t.logger.Debugf("tracing credentials updated, reinitializing tracingprovider with secret: %v", secret.Name)

t.reinitialize()
}

func (t *tracerProvider) reinitialize() {
tp, err := createTracerProvider(t.service, t.cfg, t.username, t.password)
if err != nil {
t.logger.Errorf("unable to initialize tracing with error : %v", err.Error())
return
}
t.logger.Info("initialized Tracer Provider")
if p, ok := t.provider.(*tracesdk.TracerProvider); ok {
if err := p.Shutdown(context.Background()); err != nil {
t.logger.Errorf("unable to shutdown tracingprovider with error : %v", err.Error())
}
}
t.provider = tp
}

func createTracerProvider(service string, cfg *config.Tracing, user, pass string) (trace.TracerProvider, error) {
if !cfg.Enabled {
return trace.NewNoopTracerProvider(), nil
}

exp, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint(cfg.Endpoint),
jaeger.WithUsername(user),
jaeger.WithPassword(pass),
))
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 7fc5fd1

Please sign in to comment.