From 6b2774d56bb0f66a8477f35f48cccf090ae0de2b Mon Sep 17 00:00:00 2001 From: Reto Lehmann Date: Wed, 13 Mar 2024 11:24:29 +0100 Subject: [PATCH] [RELEASE-1.13] Back-port multi-container probing (#658) * Add multi-container probing (#14853) * Add multi-container probing * Add e2e test for multi container probing (cherry picked from commit a194cb210d5ee990d8a9a4c80056c18cc54e30da) * Run generate-release.sh --- config/core/configmaps/features.yaml | 10 +- openshift/release/artifacts/serving-core.yaml | 10 +- pkg/apis/config/features.go | 3 + pkg/apis/config/features_test.go | 20 + pkg/apis/serving/k8s_validation.go | 61 +++- pkg/apis/serving/k8s_validation_test.go | 344 +++++++++++++++++- pkg/apis/serving/v1/revision_defaults.go | 15 +- pkg/apis/serving/v1/revision_defaults_test.go | 245 +++++++++++++ pkg/apis/serving/v1/revision_helpers.go | 19 + pkg/apis/serving/v1/revision_helpers_test.go | 56 +++ pkg/queue/readiness/probe.go | 103 ++++-- pkg/queue/readiness/probe_encoding.go | 39 +- pkg/queue/readiness/probe_encoding_test.go | 115 +++++- pkg/queue/readiness/probe_test.go | 239 ++++++++---- pkg/queue/sharedmain/main.go | 15 +- pkg/reconciler/revision/resources/deploy.go | 37 +- .../revision/resources/deploy_test.go | 40 ++ pkg/reconciler/revision/resources/queue.go | 101 +++-- .../revision/resources/queue_test.go | 21 +- test/e2e-tests.sh | 5 + .../multicontainer_readiness_test.go | 97 +++++ 21 files changed, 1394 insertions(+), 201 deletions(-) create mode 100644 test/e2e/multicontainerprobing/multicontainer_readiness_test.go diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml index f4c42983a1e8..51dfde27f7c2 100644 --- a/config/core/configmaps/features.yaml +++ b/config/core/configmaps/features.yaml @@ -22,7 +22,7 @@ metadata: app.kubernetes.io/component: controller app.kubernetes.io/version: devel annotations: - knative.dev/example-checksum: "f2fc138e" + knative.dev/example-checksum: "632d47dd" data: _example: |- ################################ @@ -50,9 +50,15 @@ data: # Indicates whether multi container support is enabled # # WARNING: Cannot safely be disabled once enabled. - # See: https://knative.dev/docs/serving/feature-flags/#multi-containers + # See: https://knative.dev/docs/serving/configuration/feature-flags/#multiple-containers multi-container: "enabled" + # Indicates whether multi container probing is enabled + # + # WARNING: Cannot safely be disabled once enabled. + # See: https://knative.dev/docs/serving/configuration/feature-flags/#multiple-container-probing + multi-container-probing: "disabled" + # Indicates whether Kubernetes affinity support is enabled # # WARNING: Cannot safely be disabled once enabled. diff --git a/openshift/release/artifacts/serving-core.yaml b/openshift/release/artifacts/serving-core.yaml index 8da7e410971b..eddb78429cb1 100644 --- a/openshift/release/artifacts/serving-core.yaml +++ b/openshift/release/artifacts/serving-core.yaml @@ -5280,7 +5280,7 @@ metadata: app.kubernetes.io/component: controller app.kubernetes.io/version: "release-v1.13" annotations: - knative.dev/example-checksum: "f2fc138e" + knative.dev/example-checksum: "632d47dd" data: _example: |- ################################ @@ -5308,9 +5308,15 @@ data: # Indicates whether multi container support is enabled # # WARNING: Cannot safely be disabled once enabled. - # See: https://knative.dev/docs/serving/feature-flags/#multi-containers + # See: https://knative.dev/docs/serving/configuration/feature-flags/#multiple-containers multi-container: "enabled" + # Indicates whether multi container probing is enabled + # + # WARNING: Cannot safely be disabled once enabled. + # See: https://knative.dev/docs/serving/configuration/feature-flags/#multiple-container-probing + multi-container-probing: "disabled" + # Indicates whether Kubernetes affinity support is enabled # # WARNING: Cannot safely be disabled once enabled. diff --git a/pkg/apis/config/features.go b/pkg/apis/config/features.go index 0aec2ebca30f..63234c30e13a 100644 --- a/pkg/apis/config/features.go +++ b/pkg/apis/config/features.go @@ -54,6 +54,7 @@ const ( func defaultFeaturesConfig() *Features { return &Features{ MultiContainer: Enabled, + MultiContainerProbing: Disabled, PodSpecAffinity: Disabled, PodSpecTopologySpreadConstraints: Disabled, PodSpecDryRun: Allowed, @@ -87,6 +88,7 @@ func NewFeaturesConfigFromMap(data map[string]string) (*Features, error) { if err := cm.Parse(data, asFlag("multi-container", &nc.MultiContainer), + asFlag("multi-container-probing", &nc.MultiContainerProbing), asFlag("kubernetes.podspec-affinity", &nc.PodSpecAffinity), asFlag("kubernetes.podspec-topologyspreadconstraints", &nc.PodSpecTopologySpreadConstraints), asFlag("kubernetes.podspec-dryrun", &nc.PodSpecDryRun), @@ -124,6 +126,7 @@ func NewFeaturesConfigFromConfigMap(config *corev1.ConfigMap) (*Features, error) // Features specifies which features are allowed by the webhook. type Features struct { MultiContainer Flag + MultiContainerProbing Flag PodSpecAffinity Flag PodSpecTopologySpreadConstraints Flag PodSpecDryRun Flag diff --git a/pkg/apis/config/features_test.go b/pkg/apis/config/features_test.go index 2b465cbb6b68..ae0ee2ab3a6a 100644 --- a/pkg/apis/config/features_test.go +++ b/pkg/apis/config/features_test.go @@ -60,6 +60,7 @@ func TestFeaturesConfiguration(t *testing.T) { wantErr: false, wantFeatures: defaultWith(&Features{ MultiContainer: Enabled, + MultiContainerProbing: Enabled, PodSpecAffinity: Enabled, PodSpecTopologySpreadConstraints: Enabled, PodSpecDryRun: Enabled, @@ -79,6 +80,7 @@ func TestFeaturesConfiguration(t *testing.T) { }), data: map[string]string{ "multi-container": "Enabled", + "multi-container-probing": "Enabled", "kubernetes.podspec-affinity": "Enabled", "kubernetes.podspec-topologyspreadconstraints": "Enabled", "kubernetes.podspec-dryrun": "Enabled", @@ -114,6 +116,24 @@ func TestFeaturesConfiguration(t *testing.T) { data: map[string]string{ "multi-container": "Disabled", }, + }, { + name: "multi-container-probing Allowed", + wantErr: false, + wantFeatures: defaultWith(&Features{ + MultiContainerProbing: Allowed, + }), + data: map[string]string{ + "multi-container-probing": "Allowed", + }, + }, { + name: "multi-container-probing Disabled", + wantErr: false, + wantFeatures: defaultWith(&Features{ + MultiContainerProbing: Disabled, + }), + data: map[string]string{ + "multi-container-probing": "Disabled", + }, }, { name: "kubernetes.podspec-affinity Allowed", wantErr: false, diff --git a/pkg/apis/serving/k8s_validation.go b/pkg/apis/serving/k8s_validation.go index fe2f95f5c371..091aafcd0b94 100644 --- a/pkg/apis/serving/k8s_validation.go +++ b/pkg/apis/serving/k8s_validation.go @@ -387,7 +387,7 @@ func ValidatePodSpec(ctx context.Context, ps corev1.PodSpec) *apis.FieldError { case 0: errs = errs.Also(apis.ErrMissingField("containers")) case 1: - errs = errs.Also(ValidateContainer(ctx, ps.Containers[0], volumes, port). + errs = errs.Also(ValidateUserContainer(ctx, ps.Containers[0], volumes, port). ViaFieldIndex("containers", 0)) default: errs = errs.Also(validateContainers(ctx, ps.Containers, volumes, port)) @@ -447,7 +447,7 @@ func validateContainers(ctx context.Context, containers []corev1.Container, volu // Note, if we allow readiness/liveness checks on sidecars, we should pass in an *empty* port here, not the main container's port. errs = errs.Also(validateSidecarContainer(WithinSidecarContainer(ctx), containers[i], volumes).ViaFieldIndex("containers", i)) } else { - errs = errs.Also(ValidateContainer(WithinUserContainer(ctx), containers[i], volumes, port).ViaFieldIndex("containers", i)) + errs = errs.Also(ValidateUserContainer(WithinUserContainer(ctx), containers[i], volumes, port).ViaFieldIndex("containers", i)) } } return errs @@ -503,14 +503,23 @@ func validateContainersPorts(containers []corev1.Container) (corev1.ContainerPor // validateSidecarContainer validate fields for non serving containers func validateSidecarContainer(ctx context.Context, container corev1.Container, volumes map[string]corev1.Volume) (errs *apis.FieldError) { - if container.LivenessProbe != nil { - errs = errs.Also(apis.CheckDisallowedFields(*container.LivenessProbe, - *ProbeMask(&corev1.Probe{})).ViaField("livenessProbe")) - } - if container.ReadinessProbe != nil { - errs = errs.Also(apis.CheckDisallowedFields(*container.ReadinessProbe, - *ProbeMask(&corev1.Probe{})).ViaField("readinessProbe")) + cfg := config.FromContextOrDefaults(ctx) + if cfg.Features.MultiContainerProbing != config.Enabled { + if container.LivenessProbe != nil { + errs = errs.Also(apis.CheckDisallowedFields(*container.LivenessProbe, + *ProbeMask(&corev1.Probe{})).ViaField("livenessProbe")) + } + if container.ReadinessProbe != nil { + errs = errs.Also(apis.CheckDisallowedFields(*container.ReadinessProbe, + *ProbeMask(&corev1.Probe{})).ViaField("readinessProbe")) + } + } else if cfg.Features.MultiContainerProbing == config.Enabled { + // Liveness Probes + errs = errs.Also(validateProbe(container.LivenessProbe, nil, false).ViaField("livenessProbe")) + // Readiness Probes + errs = errs.Also(validateReadinessProbe(container.ReadinessProbe, nil, false).ViaField("readinessProbe")) } + return errs.Also(validate(ctx, container, volumes)) } @@ -544,12 +553,12 @@ func validateInitContainer(ctx context.Context, container corev1.Container, volu return errs.Also(validate(WithinInitContainer(ctx), container, volumes)) } -// ValidateContainer validate fields for serving containers -func ValidateContainer(ctx context.Context, container corev1.Container, volumes map[string]corev1.Volume, port corev1.ContainerPort) (errs *apis.FieldError) { +// ValidateUserContainer validate fields for serving containers +func ValidateUserContainer(ctx context.Context, container corev1.Container, volumes map[string]corev1.Volume, port corev1.ContainerPort) (errs *apis.FieldError) { // Liveness Probes - errs = errs.Also(validateProbe(container.LivenessProbe, port).ViaField("livenessProbe")) + errs = errs.Also(validateProbe(container.LivenessProbe, &port, true).ViaField("livenessProbe")) // Readiness Probes - errs = errs.Also(validateReadinessProbe(container.ReadinessProbe, port).ViaField("readinessProbe")) + errs = errs.Also(validateReadinessProbe(container.ReadinessProbe, &port, true).ViaField("readinessProbe")) return errs.Also(validate(ctx, container, volumes)) } @@ -751,12 +760,12 @@ func validateContainerPortBasic(port corev1.ContainerPort) *apis.FieldError { return errs } -func validateReadinessProbe(p *corev1.Probe, port corev1.ContainerPort) *apis.FieldError { +func validateReadinessProbe(p *corev1.Probe, port *corev1.ContainerPort, isUserContainer bool) *apis.FieldError { if p == nil { return nil } - errs := validateProbe(p, port) + errs := validateProbe(p, port, isUserContainer) if p.PeriodSeconds < 0 { errs = errs.Also(apis.ErrOutOfBoundsValue(p.PeriodSeconds, 0, math.MaxInt32, "periodSeconds")) @@ -798,7 +807,7 @@ func validateReadinessProbe(p *corev1.Probe, port corev1.ContainerPort) *apis.Fi return errs } -func validateProbe(p *corev1.Probe, port corev1.ContainerPort) *apis.FieldError { +func validateProbe(p *corev1.Probe, port *corev1.ContainerPort, isUserContainer bool) *apis.FieldError { if p == nil { return nil } @@ -813,16 +822,28 @@ func validateProbe(p *corev1.Probe, port corev1.ContainerPort) *apis.FieldError handlers = append(handlers, "httpGet") errs = errs.Also(apis.CheckDisallowedFields(*h.HTTPGet, *HTTPGetActionMask(h.HTTPGet))).ViaField("httpGet") getPort := h.HTTPGet.Port - if getPort.StrVal != "" && getPort.StrVal != port.Name { - errs = errs.Also(apis.ErrInvalidValue(getPort.String(), "httpGet.port", "Probe port must match container port")) + if isUserContainer { + if getPort.StrVal != "" && getPort.StrVal != port.Name { + errs = errs.Also(apis.ErrInvalidValue(getPort.String(), "httpGet.port", "Probe port must match container port")) + } + } else { + if getPort.StrVal == "" && getPort.IntVal == 0 { + errs = errs.Also(apis.ErrInvalidValue(getPort.String(), "httpGet.port", "Probe port must be specified")) + } } } if h.TCPSocket != nil { handlers = append(handlers, "tcpSocket") errs = errs.Also(apis.CheckDisallowedFields(*h.TCPSocket, *TCPSocketActionMask(h.TCPSocket))).ViaField("tcpSocket") tcpPort := h.TCPSocket.Port - if tcpPort.StrVal != "" && tcpPort.StrVal != port.Name { - errs = errs.Also(apis.ErrInvalidValue(tcpPort.String(), "tcpSocket.port", "Probe port must match container port")) + if isUserContainer { + if tcpPort.StrVal != "" && tcpPort.StrVal != port.Name { + errs = errs.Also(apis.ErrInvalidValue(tcpPort.String(), "tcpSocket.port", "Probe port must match container port")) + } + } else { + if tcpPort.StrVal == "" && tcpPort.IntVal == 0 { + errs = errs.Also(apis.ErrInvalidValue(tcpPort.String(), "tcpSocket.port", "Probe port must be specified")) + } } } if h.Exec != nil { diff --git a/pkg/apis/serving/k8s_validation_test.go b/pkg/apis/serving/k8s_validation_test.go index 13bbbcc46277..a98132feb10f 100644 --- a/pkg/apis/serving/k8s_validation_test.go +++ b/pkg/apis/serving/k8s_validation_test.go @@ -162,8 +162,15 @@ func withPodSpecInitContainersEnabled() configOption { cfg.Features.PodSpecInitContainers = config.Enabled return cfg } +} +func withMultiContainerProbesEnabled() configOption { + return func(cfg *config.Config) *config.Config { + cfg.Features.MultiContainerProbing = config.Enabled + return cfg + } } + func withPodSpecDNSPolicyEnabled() configOption { return func(cfg *config.Config) *config.Config { cfg.Features.PodSpecDNSPolicy = config.Enabled @@ -1485,7 +1492,7 @@ func TestPodSpecFieldRefValidation(t *testing.T) { } } -func TestContainerValidation(t *testing.T) { +func TestUserContainerValidation(t *testing.T) { tests := []containerValidationTestCase{ { name: "has a lifecycle", @@ -1954,10 +1961,341 @@ func TestContainerValidation(t *testing.T) { } port, err := validateContainersPorts([]corev1.Container{test.c}) - got := err.Also(ValidateContainer(ctx, test.c, test.volumes, port)) + got := err.Also(ValidateUserContainer(ctx, test.c, test.volumes, port)) got = got.Filter(apis.ErrorLevel) if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { - t.Errorf("ValidateContainer (-want, +got): \n%s", diff) + t.Errorf("ValidateUserContainer (-want, +got): \n%s", diff) + } + }) + } +} + +func TestSidecarContainerValidation(t *testing.T) { + tests := []containerValidationTestCase{ + { + name: "probes not allowed", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + }, + }, + want: apis.ErrDisallowedFields("livenessProbe", "readinessProbe", "readinessProbe.failureThreshold", "readinessProbe.periodSeconds", "readinessProbe.successThreshold", "readinessProbe.timeoutSeconds"), + }, + { + name: "invalid probes (no port defined)", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + }, + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{}, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: apis.ErrInvalidValue(0, "livenessProbe.tcpSocket.port, readinessProbe.httpGet.port", "Probe port must be specified"), + }, { + name: "valid with exec probes", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + InitialDelaySeconds: 0, + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt32(5000), + }, + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{}, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: nil, + }, { + name: "invalid with no handler", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt32(5000), + }, + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{}, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: apis.ErrMissingOneOf("livenessProbe.httpGet", "livenessProbe.tcpSocket", "livenessProbe.exec", "livenessProbe.grpc"), + }, { + name: "invalid with multiple handlers", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt32(5000), + }, + Exec: &corev1.ExecAction{}, + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt32(5000), + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: apis.ErrMultipleOneOf("readinessProbe.exec", "readinessProbe.tcpSocket", "readinessProbe.httpGet"), + }, { + name: "valid liveness http probe", + c: corev1.Container{ + Image: "foo", + LivenessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt(5000), + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: nil, + }, { + name: "valid liveness tcp probe", + c: corev1.Container{ + Image: "foo", + LivenessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(5000), + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: nil, + }, { + name: "valid readiness http probe", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt(5000), + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: nil, + }, { + name: "valid readiness tcp probe", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(5000), + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: nil, + }, { + name: "valid readiness http probe with named port", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromString("http"), // http is the default + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: nil, + }, { + name: "invalid readiness probe (has failureThreshold while using special probe)", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 0, + FailureThreshold: 2, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt(5000), + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: &apis.FieldError{ + Message: "failureThreshold is disallowed when periodSeconds is zero", + Paths: []string{"readinessProbe.failureThreshold"}, + }, + }, { + name: "invalid readiness probe (has timeoutSeconds while using special probe)", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 0, + TimeoutSeconds: 2, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt(5000), + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: &apis.FieldError{ + Message: "timeoutSeconds is disallowed when periodSeconds is zero", + Paths: []string{"readinessProbe.timeoutSeconds"}, + }, + }, { + name: "out of bounds probe values", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: -1, + TimeoutSeconds: 0, + SuccessThreshold: 0, + FailureThreshold: 0, + InitialDelaySeconds: -1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromInt(5000), + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: apis.ErrOutOfBoundsValue(-1, 0, math.MaxInt32, "readinessProbe.periodSeconds").Also( + apis.ErrOutOfBoundsValue(0, 1, math.MaxInt32, "readinessProbe.timeoutSeconds")).Also( + apis.ErrOutOfBoundsValue(0, 1, math.MaxInt32, "readinessProbe.successThreshold")).Also( + apis.ErrOutOfBoundsValue(0, 1, math.MaxInt32, "readinessProbe.failureThreshold")).Also( + apis.ErrOutOfBoundsValue(-1, 0, math.MaxInt32, "readinessProbe.initialDelaySeconds")), + }, { + name: "valid grpc probe", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + GRPC: &corev1.GRPCAction{ + Port: 46, + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: nil, + }, { + name: "valid grpc probe with service", + c: corev1.Container{ + Image: "foo", + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 1, + TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, + ProbeHandler: corev1.ProbeHandler{ + GRPC: &corev1.GRPCAction{ + Port: 46, + Service: ptr.String("foo"), + }, + }, + }, + }, + cfgOpts: []configOption{withMultiContainerProbesEnabled()}, + want: nil, + }, + } + tests = append(tests, getCommonContainerValidationTestCases()...) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if test.cfgOpts != nil { + cfg := config.FromContextOrDefaults(ctx) + for _, opt := range test.cfgOpts { + cfg = opt(cfg) + } + ctx = config.ToContext(ctx, cfg) + } + err := validateSidecarContainer(ctx, test.c, test.volumes) + err = err.Filter(apis.ErrorLevel) + if diff := cmp.Diff(test.want.Error(), err.Error()); diff != "" { + t.Errorf("validateSidecarContainer (-want, +got): \n%s", diff) } }) } diff --git a/pkg/apis/serving/v1/revision_defaults.go b/pkg/apis/serving/v1/revision_defaults.go index be91eb5c5d9e..61e91e9e8184 100644 --- a/pkg/apis/serving/v1/revision_defaults.go +++ b/pkg/apis/serving/v1/revision_defaults.go @@ -132,9 +132,10 @@ func (rs *RevisionSpec) applyDefault(ctx context.Context, container *corev1.Cont // If there are multiple containers then default probes will be applied to the container where user specified PORT // default probes will not be applied for non serving containers if len(rs.PodSpec.Containers) == 1 || len(container.Ports) != 0 { - rs.applyProbesWithDefaults(container) - rs.applyGRPCProbeDefaults(container) + rs.applyUserContainerDefaultReadinessProbe(container) } + rs.applyReadinessProbeDefaults(container) + rs.applyGRPCProbeDefaults(container) if rs.PodSpec.EnableServiceLinks == nil && apis.IsInCreate(ctx) { rs.PodSpec.EnableServiceLinks = cfg.Defaults.EnableServiceLinks @@ -154,7 +155,7 @@ func (rs *RevisionSpec) applyDefault(ctx context.Context, container *corev1.Cont } } -func (*RevisionSpec) applyProbesWithDefaults(container *corev1.Container) { +func (*RevisionSpec) applyUserContainerDefaultReadinessProbe(container *corev1.Container) { if container.ReadinessProbe == nil { container.ReadinessProbe = &corev1.Probe{} } @@ -164,6 +165,14 @@ func (*RevisionSpec) applyProbesWithDefaults(container *corev1.Container) { container.ReadinessProbe.GRPC == nil { container.ReadinessProbe.TCPSocket = &corev1.TCPSocketAction{} } +} + +func (*RevisionSpec) applyReadinessProbeDefaults(container *corev1.Container) { + if container.ReadinessProbe == nil { + // Sidecars are allowed to not have a readiness-probe + // we do not want the defaults in that case. + return + } if container.ReadinessProbe.SuccessThreshold == 0 { container.ReadinessProbe.SuccessThreshold = 1 diff --git a/pkg/apis/serving/v1/revision_defaults_test.go b/pkg/apis/serving/v1/revision_defaults_test.go index 0374f85a2128..fd06263a0451 100644 --- a/pkg/apis/serving/v1/revision_defaults_test.go +++ b/pkg/apis/serving/v1/revision_defaults_test.go @@ -1054,6 +1054,251 @@ func TestRevisionDefaulting(t *testing.T) { }, }, }, + }, { + name: "multiple containers with default probes", + in: &Revision{ + Spec: RevisionSpec{ + ContainerConcurrency: ptr.Int64(1), + TimeoutSeconds: ptr.Int64(99), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "foo", + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, + }, { + Name: "second", + }}, + }, + }, + }, + want: &Revision{ + Spec: RevisionSpec{ + ContainerConcurrency: ptr.Int64(1), + TimeoutSeconds: ptr.Int64(99), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "foo", + Resources: defaultResources, + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, + }, { + Name: "second", + Resources: defaultResources, + }}, + }, + }, + }, + }, { + name: "multiple containers with probes no override", + in: &Revision{ + Spec: RevisionSpec{ + ContainerConcurrency: ptr.Int64(1), + TimeoutSeconds: ptr.Int64(99), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "foo", + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, + }, { + Name: "second", + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, + }}, + }, + }, + }, + want: &Revision{ + Spec: RevisionSpec{ + ContainerConcurrency: ptr.Int64(1), + TimeoutSeconds: ptr.Int64(99), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "foo", + Resources: defaultResources, + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, + }, { + Name: "second", + Resources: defaultResources, + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.2", + }, + }, + }, + }}, + }, + }, + }, + }, { + name: "multiple containers with exec probes no override", + in: &Revision{ + Spec: RevisionSpec{ + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"echo", "hi"}, + }, + }, + }, + }, { + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"echo", "hi"}, + }, + }, + }, + }}, + }, + }, + }, + want: &Revision{ + Spec: RevisionSpec{ + TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), + ContainerConcurrency: ptr.Int64(config.DefaultContainerConcurrency), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: config.DefaultUserContainerName + "-0", + Resources: defaultResources, + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"echo", "hi"}, + }, + }, + }, + }, { + Name: config.DefaultUserContainerName + "-1", + Resources: defaultResources, + ReadinessProbe: &corev1.Probe{ + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"echo", "hi"}, + }, + }, + }, + }}, + }, + }, + }, + }, { + name: "multiple containers apply k8s defaults when period seconds has a non zero value", + in: &Revision{ + Spec: RevisionSpec{ + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Ports: []corev1.ContainerPort{{ + ContainerPort: 8080, + }}, + ReadinessProbe: &corev1.Probe{ + // FailureThreshold and TimeoutSeconds missing + PeriodSeconds: 10, + }, + }, { + ReadinessProbe: &corev1.Probe{ + // FailureThreshold and TimeoutSeconds missing + PeriodSeconds: 10, + }, + }}, + }, + }, + }, + want: &Revision{ + Spec: RevisionSpec{ + ContainerConcurrency: ptr.Int64(config.DefaultContainerConcurrency), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: config.DefaultUserContainerName + "-0", + Ports: []corev1.ContainerPort{{ + ContainerPort: 8080, + }}, + ReadinessProbe: &corev1.Probe{ + FailureThreshold: 3, // Added as k8s default + ProbeHandler: defaultProbe.ProbeHandler, + PeriodSeconds: 10, + SuccessThreshold: 1, + TimeoutSeconds: 1, // Added as k8s default + }, + Resources: defaultResources, + }, { + Name: config.DefaultUserContainerName + "-1", + ReadinessProbe: &corev1.Probe{ + FailureThreshold: 3, // Added as k8s default + PeriodSeconds: 10, + SuccessThreshold: 1, + TimeoutSeconds: 1, // Added as k8s default + }, + Resources: defaultResources, + }}, + }, + TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), + }, + }, + }, { + name: "multiple containers partially initialized", + in: &Revision{ + Spec: RevisionSpec{ + PodSpec: corev1.PodSpec{Containers: []corev1.Container{{ + Ports: []corev1.ContainerPort{{ + ContainerPort: 8080, + }}, + }, {}}}, + }, + }, + want: &Revision{ + Spec: RevisionSpec{ + TimeoutSeconds: ptr.Int64(config.DefaultRevisionTimeoutSeconds), + ContainerConcurrency: ptr.Int64(config.DefaultContainerConcurrency), + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: config.DefaultUserContainerName + "-0", + Ports: []corev1.ContainerPort{{ + ContainerPort: 8080, + }}, + Resources: defaultResources, + ReadinessProbe: defaultProbe, + }, { + Name: config.DefaultUserContainerName + "-1", + Resources: defaultResources, + ReadinessProbe: nil, + }}, + }, + }, + }, }} for _, test := range tests { diff --git a/pkg/apis/serving/v1/revision_helpers.go b/pkg/apis/serving/v1/revision_helpers.go index e561c7ae6495..ade03840e3d7 100644 --- a/pkg/apis/serving/v1/revision_helpers.go +++ b/pkg/apis/serving/v1/revision_helpers.go @@ -71,6 +71,7 @@ const ( // It is never nil and should be exactly the specified container if len(containers) == 1 or // if there are multiple containers it returns the container which has Ports // as guaranteed by validation. +// Note: If you change this function, also update GetSidecarContainers. func (rs *RevisionSpec) GetContainer() *corev1.Container { switch { case len(rs.Containers) == 1: @@ -86,6 +87,24 @@ func (rs *RevisionSpec) GetContainer() *corev1.Container { return &corev1.Container{} } +// GetSidecarContainers returns a slice of pointers to all sidecar containers. +// If len(containers) == 1 OR only one container with a user-port exists, it will return an empty slice. +// It is the "rest" of GetContainer. +func (rs *RevisionSpec) GetSidecarContainers() []*corev1.Container { + sidecars := []*corev1.Container{} + if len(rs.Containers) == 1 { + return sidecars + } + + for i, c := range rs.Containers { + if len(c.Ports) == 0 { + sidecars = append(sidecars, &rs.Containers[i]) + } + } + + return sidecars +} + // SetRoutingState sets the routingState label on this Revision and updates the // routingStateModified annotation. func (r *Revision) SetRoutingState(state RoutingState, tm time.Time) { diff --git a/pkg/apis/serving/v1/revision_helpers_test.go b/pkg/apis/serving/v1/revision_helpers_test.go index 65feea02ed5f..0c85d6140bda 100644 --- a/pkg/apis/serving/v1/revision_helpers_test.go +++ b/pkg/apis/serving/v1/revision_helpers_test.go @@ -225,6 +225,62 @@ func TestGetContainer(t *testing.T) { } } +func TestGetSidecarContainers(t *testing.T) { + cases := []struct { + name string + status RevisionSpec + want []*corev1.Container + }{{ + name: "empty revisionSpec should return empty slice", + status: RevisionSpec{}, + want: []*corev1.Container{}, + }, { + name: "single container should return empty slice", + status: RevisionSpec{ + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "user-container", + Image: "foo", + }}, + }, + }, + want: []*corev1.Container{}, + }, { + name: "get sidecars and not user-container", + status: RevisionSpec{ + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "user-container", + Image: "firstImage", + Ports: []corev1.ContainerPort{{ + ContainerPort: 8888, + }}, + }, { + Name: "secondContainer", + Image: "secondImage", + }, { + Name: "thirdContainer", + Image: "thirdImage", + }}, + }, + }, + want: []*corev1.Container{{ + Name: "secondContainer", + Image: "secondImage", + }, { + Name: "thirdContainer", + Image: "thirdImage", + }}, + }} + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if want, got := tc.want, tc.status.GetSidecarContainers(); !equality.Semantic.DeepEqual(want, got) { + t.Errorf("GetSidecarContainers: %v want: %v", got, want) + } + }) + } +} + func TestSetRoutingState(t *testing.T) { rev := &Revision{} empty := time.Time{} diff --git a/pkg/queue/readiness/probe.go b/pkg/queue/readiness/probe.go index 8684e7cb6a22..b5b09636ccca 100644 --- a/pkg/queue/readiness/probe.go +++ b/pkg/queue/readiness/probe.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "knative.dev/serving/pkg/queue/health" @@ -37,13 +38,10 @@ const ( retryInterval = 50 * time.Millisecond ) -// Probe wraps a corev1.Probe along with a count of consecutive, successful probes +// Probe holds all wrapped *corev1.Probe along with a barrier to sync single probing execution type Probe struct { - *corev1.Probe - count int32 - pollTimeout time.Duration // To make tests not run for 10 seconds. - out io.Writer // To make tests not log errors in good cases. - autoDetectHTTP2 bool // Feature gate to enable HTTP2 auto-detection. + probes []*wrappedProbe + out io.Writer // To make tests not log errors in good cases. // Barrier sync to ensure only one probe is happening at the same time. // When a probe is active `gv` will be non-nil. @@ -52,6 +50,14 @@ type Probe struct { gv *gateValue } +type wrappedProbe struct { + *corev1.Probe + count int32 + pollTimeout time.Duration // To make tests not run for 10 seconds. + out io.Writer // To make tests not log errors in good cases. + autoDetectHTTP2 bool // Feature gate to enable HTTP2 auto-detection. +} + // gateValue is a write-once boolean impl. type gateValue struct { broadcast chan struct{} @@ -79,27 +85,41 @@ func (gv *gateValue) read() bool { } // NewProbe returns a pointer to a new Probe. -func NewProbe(v1p *corev1.Probe) *Probe { +func NewProbe(probes []*corev1.Probe) *Probe { + wrappedProbes := []*wrappedProbe{} + for _, p := range probes { + wrappedProbes = append(wrappedProbes, &wrappedProbe{ + Probe: p, + out: os.Stderr, + pollTimeout: PollTimeout, + }) + } return &Probe{ - Probe: v1p, - pollTimeout: PollTimeout, - out: os.Stderr, + probes: wrappedProbes, + out: os.Stderr, } } // NewProbeWithHTTP2AutoDetection returns a pointer to a new Probe that has HTTP2 // auto-detection enabled. -func NewProbeWithHTTP2AutoDetection(v1p *corev1.Probe) *Probe { +func NewProbeWithHTTP2AutoDetection(probes []*corev1.Probe) *Probe { + wrappedProbes := []*wrappedProbe{} + for _, p := range probes { + wrappedProbes = append(wrappedProbes, &wrappedProbe{ + Probe: p, + out: os.Stderr, + pollTimeout: PollTimeout, + autoDetectHTTP2: true, + }) + } return &Probe{ - Probe: v1p, - pollTimeout: PollTimeout, - out: os.Stderr, - autoDetectHTTP2: true, + probes: wrappedProbes, + out: os.Stderr, } } // shouldProbeAggressively indicates whether the Knative probe with aggressive retries should be used. -func (p *Probe) shouldProbeAggressively() bool { +func (p *wrappedProbe) shouldProbeAggressively() bool { return p.PeriodSeconds == 0 } @@ -128,36 +148,39 @@ func (p *Probe) ProbeContainer() bool { } func (p *Probe) probeContainerImpl() bool { - var err error - - switch { - case p.HTTPGet != nil: - err = p.httpProbe() - case p.TCPSocket != nil: - err = p.tcpProbe() - case p.GRPC != nil: - err = p.grpcProbe() - case p.Exec != nil: - // Should never be reachable. Exec probes to be translated to - // TCP probes when container is built. - // Using Fprintf for a concise error message in the event log. - fmt.Fprintln(p.out, "exec probe not supported") - return false - default: - // Using Fprintf for a concise error message in the event log. - fmt.Fprintln(p.out, "no probe found") - return false + var probeGroup errgroup.Group + + for _, probe := range p.probes { + innerProbe := probe + probeGroup.Go(func() error { + switch { + case innerProbe.HTTPGet != nil: + return innerProbe.httpProbe() + case innerProbe.TCPSocket != nil: + + return innerProbe.tcpProbe() + case innerProbe.GRPC != nil: + return innerProbe.grpcProbe() + case innerProbe.Exec != nil: + // Should never be reachable. Exec probes to be translated to + // TCP probes when container is built. + return fmt.Errorf("exec probe not supported") + default: + return fmt.Errorf("no probe found") + } + }) } + err := probeGroup.Wait() if err != nil { - // Using Fprintf for a concise error message in the event log. fmt.Fprintln(p.out, err.Error()) return false } + return true } -func (p *Probe) doProbe(probe func(time.Duration) error) error { +func (p *wrappedProbe) doProbe(probe func(time.Duration) error) error { if !p.shouldProbeAggressively() { return probe(time.Duration(p.TimeoutSeconds) * time.Second) } @@ -193,7 +216,7 @@ func (p *Probe) doProbe(probe func(time.Duration) error) error { // tcpProbe function executes TCP probe once if its standard probe // otherwise TCP probe polls condition function which returns true // if the probe count is greater than success threshold and false if TCP probe fails -func (p *Probe) tcpProbe() error { +func (p *wrappedProbe) tcpProbe() error { config := health.TCPProbeConfigOptions{ Address: p.TCPSocket.Host + ":" + p.TCPSocket.Port.String(), } @@ -207,7 +230,7 @@ func (p *Probe) tcpProbe() error { // httpProbe function executes HTTP probe once if its standard probe // otherwise HTTP probe polls condition function which returns true // if the probe count is greater than success threshold and false if HTTP probe fails -func (p *Probe) httpProbe() error { +func (p *wrappedProbe) httpProbe() error { config := health.HTTPProbeConfigOptions{ HTTPGetAction: p.HTTPGet, MaxProtoMajor: 1, @@ -225,7 +248,7 @@ func (p *Probe) httpProbe() error { } // grpcProbe function executes gRPC probe -func (p *Probe) grpcProbe() error { +func (p *wrappedProbe) grpcProbe() error { config := health.GRPCProbeConfigOptions{ GRPCAction: p.GRPC, } diff --git a/pkg/queue/readiness/probe_encoding.go b/pkg/queue/readiness/probe_encoding.go index 4f79b2ffb26e..78a2daedab72 100644 --- a/pkg/queue/readiness/probe_encoding.go +++ b/pkg/queue/readiness/probe_encoding.go @@ -23,17 +23,27 @@ import ( corev1 "k8s.io/api/core/v1" ) -// DecodeProbe takes a json serialised *corev1.Probe and returns a Probe or an error. -func DecodeProbe(jsonProbe string) (*corev1.Probe, error) { - p := &corev1.Probe{} - if err := json.Unmarshal([]byte(jsonProbe), p); err != nil { - return nil, err +// DecodeProbes takes a json serialised *corev1.Probe OR []*corev1.Probe (depending on multiContainerProbes) +// and returns a slice of probes or an error. +func DecodeProbes(jsonProbe string, multiContainerProbes bool) ([]*corev1.Probe, error) { + probes := []*corev1.Probe{} + if multiContainerProbes { + if err := json.Unmarshal([]byte(jsonProbe), &probes); err != nil { + return nil, err + } + } else { + p := &corev1.Probe{} + if err := json.Unmarshal([]byte(jsonProbe), &p); err != nil { + return nil, err + } + probes = append(probes, p) } - return p, nil + + return probes, nil } -// EncodeProbe takes *corev1.Probe object and returns marshalled Probe JSON string and an error. -func EncodeProbe(rp *corev1.Probe) (string, error) { +// EncodeSingleProbe takes a single *corev1.Probe object and returns marshalled Probe JSON string and an error. +func EncodeSingleProbe(rp *corev1.Probe) (string, error) { if rp == nil { return "", errors.New("cannot encode nil probe") } @@ -44,3 +54,16 @@ func EncodeProbe(rp *corev1.Probe) (string, error) { } return string(probeJSON), nil } + +// EncodeMultipleProbes takes []*corev1.Probe slice and returns marshalled slice of Probe JSON string and an error. +func EncodeMultipleProbes(rps []*corev1.Probe) (string, error) { + if len(rps) == 0 { + return "", errors.New("cannot encode nil or empty probes") + } + + probeJSON, err := json.Marshal(rps) + if err != nil { + return "", err + } + return string(probeJSON), nil +} diff --git a/pkg/queue/readiness/probe_encoding_test.go b/pkg/queue/readiness/probe_encoding_test.go index 52084116b950..6f28823a1f60 100644 --- a/pkg/queue/readiness/probe_encoding_test.go +++ b/pkg/queue/readiness/probe_encoding_test.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) -func TestParseProbeSuccess(t *testing.T) { +func TestParseSingleProbeSuccess(t *testing.T) { expectedProbe := &corev1.Probe{ PeriodSeconds: 1, TimeoutSeconds: 2, @@ -42,12 +42,49 @@ func TestParseProbeSuccess(t *testing.T) { if err != nil { t.Fatalf("Failed to decode probe %#v", err) } - gotProbe, err := DecodeProbe(string(probeBytes)) + gotProbes, err := DecodeProbes(string(probeBytes), false) if err != nil { - t.Fatalf("Failed DecodeProbe() %#v", err) + t.Fatalf("Failed DecodeProbes() %#v", err) } - if d := cmp.Diff(gotProbe, expectedProbe); d != "" { - t.Errorf("Probe diff %s; got %v, want %v", d, gotProbe, expectedProbe) + if d := cmp.Diff(gotProbes, []*corev1.Probe{expectedProbe}); d != "" { + t.Errorf("Probe diff %s; got %v, want %v", d, gotProbes, expectedProbe) + } +} + +func TestParseMultipleProbeSuccess(t *testing.T) { + expectedProbes := []*corev1.Probe{{ + PeriodSeconds: 1, + TimeoutSeconds: 2, + SuccessThreshold: 1, + FailureThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromString("8080"), + }, + }, + }, { + PeriodSeconds: 1, + TimeoutSeconds: 2, + SuccessThreshold: 1, + FailureThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromString("8090"), + }, + }, + }} + probeBytes, err := json.Marshal(expectedProbes) + if err != nil { + t.Fatalf("Failed to decode probe %#v", err) + } + gotProbes, err := DecodeProbes(string(probeBytes), true) + if err != nil { + t.Fatalf("Failed DecodeProbes() %#v", err) + } + if d := cmp.Diff(gotProbes, expectedProbes); d != "" { + t.Errorf("Probe diff %s; got %v, want %v", d, gotProbes, expectedProbes) } } @@ -56,13 +93,13 @@ func TestParseProbeFailure(t *testing.T) { if err != nil { t.Fatalf("Failed to decode probe %#v", err) } - _, err = DecodeProbe(string(probeBytes)) + _, err = DecodeProbes(string(probeBytes), false) if err == nil { - t.Fatal("Expected DecodeProbe() to fail") + t.Fatal("Expected DecodeProbes() to fail") } } -func TestEncodeProbe(t *testing.T) { +func TestEncodeSingleProbe(t *testing.T) { probe := &corev1.Probe{ SuccessThreshold: 1, ProbeHandler: corev1.ProbeHandler{ @@ -73,7 +110,7 @@ func TestEncodeProbe(t *testing.T) { }, } - jsonProbe, err := EncodeProbe(probe) + jsonProbe, err := EncodeSingleProbe(probe) if err != nil { t.Fatalf("Expected no errer, got: %#v", err) @@ -86,8 +123,64 @@ func TestEncodeProbe(t *testing.T) { } } -func TestEncodeNilProbe(t *testing.T) { - jsonProbe, err := EncodeProbe(nil) +func TestEncodeMultipleProbes(t *testing.T) { + probes := []*corev1.Probe{{ + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromString("8080"), + }, + }, + }, { + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Host: "127.0.0.1", + Port: intstr.FromString("8090"), + }, + }, + }} + + jsonProbe, err := EncodeMultipleProbes(probes) + + if err != nil { + t.Fatalf("Expected no errer, got: %#v", err) + } + + wantProbe := `[{"tcpSocket":{"port":"8080","host":"127.0.0.1"},"successThreshold":1},{"tcpSocket":{"port":"8090","host":"127.0.0.1"},"successThreshold":1}]` + + if diff := cmp.Diff(jsonProbe, wantProbe); diff != "" { + t.Errorf("Probe diff: %s; got %v, want %v", diff, jsonProbe, wantProbe) + } +} + +func TestEncodeSingleNilProbe(t *testing.T) { + jsonProbe, err := EncodeSingleProbe(nil) + + if err == nil { + t.Error("Expected error") + } + + if jsonProbe != "" { + t.Error("Expected empty probe string; got", jsonProbe) + } +} + +func TestEncodeMultipleNilProbe(t *testing.T) { + jsonProbe, err := EncodeMultipleProbes(nil) + + if err == nil { + t.Error("Expected error") + } + + if jsonProbe != "" { + t.Error("Expected empty probe string; got", jsonProbe) + } +} + +func TestEncodeMultipleEmptyProbe(t *testing.T) { + jsonProbe, err := EncodeMultipleProbes([]*corev1.Probe{}) if err == nil { t.Error("Expected error") diff --git a/pkg/queue/readiness/probe_test.go b/pkg/queue/readiness/probe_test.go index 52d76383a130..67a63112be57 100644 --- a/pkg/queue/readiness/probe_test.go +++ b/pkg/queue/readiness/probe_test.go @@ -29,12 +29,12 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "go.uber.org/atomic" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" ) @@ -48,24 +48,24 @@ func TestNewProbe(t *testing.T) { ProbeHandler: corev1.ProbeHandler{ TCPSocket: &corev1.TCPSocketAction{ Host: "127.0.0.1", - Port: intstr.FromInt(12345), + Port: intstr.FromInt32(12345), }, }, } - p := NewProbe(v1p) + p := NewProbe([]*corev1.Probe{v1p}) - if diff := cmp.Diff(p.Probe, v1p); diff != "" { + if diff := cmp.Diff(p.probes[0].Probe, v1p); diff != "" { t.Error("NewProbe (-want, +got) =", diff) } - if c := p.count; c != 0 { + if c := p.probes[0].count; c != 0 { t.Error("Expected Probe.Count == 0, got:", c) } } func TestTCPFailure(t *testing.T) { - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 1, SuccessThreshold: 1, @@ -73,10 +73,10 @@ func TestTCPFailure(t *testing.T) { ProbeHandler: corev1.ProbeHandler{ TCPSocket: &corev1.TCPSocketAction{ Host: "127.0.0.1", - Port: intstr.FromInt(12345), + Port: intstr.FromInt32(12345), }, }, - }) + }}) if pb.ProbeContainer() { t.Error("Reported success when no server was available for connection") @@ -84,7 +84,7 @@ func TestTCPFailure(t *testing.T) { } func TestAggressiveFailureOnlyLogsOnce(t *testing.T) { - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 0, // Aggressive probe. TimeoutSeconds: 1, SuccessThreshold: 1, @@ -92,17 +92,17 @@ func TestAggressiveFailureOnlyLogsOnce(t *testing.T) { ProbeHandler: corev1.ProbeHandler{ TCPSocket: &corev1.TCPSocketAction{ Host: "127.0.0.1", - Port: intstr.FromInt(12345), + Port: intstr.FromInt32(12345), }, }, - }) + }}) // Make the poll timeout a ton shorter but long enough to potentially observe // multiple log lines. - pb.pollTimeout = retryInterval * 3 + pb.probes[0].pollTimeout = retryInterval * 3 var buf bytes.Buffer - pb.out = &buf + pb.probes[0].out = &buf pb.ProbeContainer() if got := strings.Count(buf.String(), "aggressive probe error"); got != 1 { @@ -123,7 +123,7 @@ func TestAggressiveFailureNotLoggedOnSuccess(t *testing.T) { w.WriteHeader(http.StatusInternalServerError) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 0, // Aggressive probe. TimeoutSeconds: 1, SuccessThreshold: 1, @@ -135,7 +135,7 @@ func TestAggressiveFailureNotLoggedOnSuccess(t *testing.T) { Port: intstr.FromString(tsURL.Port()), }, }, - }) + }}) var buf bytes.Buffer pb.out = &buf @@ -147,13 +147,13 @@ func TestAggressiveFailureNotLoggedOnSuccess(t *testing.T) { } func TestEmptyHandler(t *testing.T) { - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 1, SuccessThreshold: 1, FailureThreshold: 1, ProbeHandler: corev1.ProbeHandler{}, - }) + }}) if pb.ProbeContainer() { t.Error("Reported success when no handler was configured.") @@ -161,7 +161,7 @@ func TestEmptyHandler(t *testing.T) { } func TestExecHandler(t *testing.T) { - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 1, SuccessThreshold: 1, @@ -170,7 +170,7 @@ func TestExecHandler(t *testing.T) { Exec: &corev1.ExecAction{ Command: []string{"echo", "hello"}, }}, - }) + }}) if pb.ProbeContainer() { t.Error("Expected ExecProbe to always fail") @@ -182,7 +182,7 @@ func TestTCPSuccess(t *testing.T) { w.WriteHeader(http.StatusOK) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 2, SuccessThreshold: 1, @@ -193,7 +193,7 @@ func TestTCPSuccess(t *testing.T) { Port: intstr.FromString(tsURL.Port()), }, }, - }) + }}) if !pb.ProbeContainer() { t.Error("Probe report failure. Expected success.") @@ -201,7 +201,7 @@ func TestTCPSuccess(t *testing.T) { } func TestHTTPFailureToConnect(t *testing.T) { - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 2, SuccessThreshold: 1, @@ -209,11 +209,11 @@ func TestHTTPFailureToConnect(t *testing.T) { ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ Host: "127.0.0.1", - Port: intstr.FromInt(12345), + Port: intstr.FromInt32(12345), Scheme: corev1.URISchemeHTTP, }, }, - }) + }}) if pb.ProbeContainer() { t.Error("Reported success when no server was available for connection") @@ -225,7 +225,7 @@ func TestHTTPBadResponse(t *testing.T) { w.WriteHeader(http.StatusBadRequest) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 5, SuccessThreshold: 1, @@ -237,7 +237,7 @@ func TestHTTPBadResponse(t *testing.T) { Scheme: corev1.URISchemeHTTP, }, }, - }) + }}) if pb.ProbeContainer() { t.Error("Reported success when server replied with Bad Request") @@ -249,7 +249,7 @@ func TestHTTPSuccess(t *testing.T) { w.WriteHeader(http.StatusOK) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{&corev1.Probe{ PeriodSeconds: 1, TimeoutSeconds: 5, SuccessThreshold: 1, @@ -261,7 +261,7 @@ func TestHTTPSuccess(t *testing.T) { Scheme: corev1.URISchemeHTTP, }, }, - }) + }}) if !pb.ProbeContainer() { t.Error("Probe failed. Expected success.") @@ -277,7 +277,7 @@ func TestHTTPManyParallel(t *testing.T) { } }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 5, SuccessThreshold: 1, @@ -289,7 +289,7 @@ func TestHTTPManyParallel(t *testing.T) { Scheme: corev1.URISchemeHTTP, }, }, - }) + }}) var grp errgroup.Group for i := 0; i < 2; i++ { @@ -323,7 +323,7 @@ func TestHTTPTimeout(t *testing.T) { w.WriteHeader(http.StatusOK) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 1, SuccessThreshold: 1, @@ -335,7 +335,7 @@ func TestHTTPTimeout(t *testing.T) { Scheme: corev1.URISchemeHTTP, }, }, - }) + }}) if pb.ProbeContainer() { t.Error("Probe succeeded. Expected failure due to timeout.") @@ -348,7 +348,7 @@ func TestHTTPSuccessWithDelay(t *testing.T) { w.WriteHeader(http.StatusOK) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 2, SuccessThreshold: 1, @@ -360,13 +360,130 @@ func TestHTTPSuccessWithDelay(t *testing.T) { Scheme: corev1.URISchemeHTTP, }, }, - }) + }}) if !pb.ProbeContainer() { t.Error("Probe failed. Wanted success.") } } +func TestMultipleHTTPSuccess(t *testing.T) { + tsURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + tsURL2 := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + pb := NewProbe([]*corev1.Probe{{ + PeriodSeconds: 1, + TimeoutSeconds: 5, + SuccessThreshold: 1, + FailureThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: tsURL.Hostname(), + Port: intstr.FromString(tsURL.Port()), + Scheme: corev1.URISchemeHTTP, + }, + }, + }, { + PeriodSeconds: 1, + TimeoutSeconds: 5, + SuccessThreshold: 1, + FailureThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: tsURL2.Hostname(), + Port: intstr.FromString(tsURL2.Port()), + Scheme: corev1.URISchemeHTTP, + }, + }, + }}) + + if !pb.ProbeContainer() { + t.Error("Probe failed. Expected success.") + } +} + +func TestMultipleHTTPFirstFailing(t *testing.T) { + tsURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }) + tsURL2 := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + pb := NewProbe([]*corev1.Probe{{ + PeriodSeconds: 1, + TimeoutSeconds: 5, + SuccessThreshold: 1, + FailureThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: tsURL.Hostname(), + Port: intstr.FromString(tsURL.Port()), + Scheme: corev1.URISchemeHTTP, + }, + }, + }, { + PeriodSeconds: 1, + TimeoutSeconds: 5, + SuccessThreshold: 1, + FailureThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: tsURL2.Hostname(), + Port: intstr.FromString(tsURL2.Port()), + Scheme: corev1.URISchemeHTTP, + }, + }, + }}) + + if pb.ProbeContainer() { + t.Error("Probe succeeded. Expected failure.") + } +} + +func TestMultipleHTTPFirstSecond(t *testing.T) { + tsURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + tsURL2 := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }) + + pb := NewProbe([]*corev1.Probe{{ + PeriodSeconds: 1, + TimeoutSeconds: 5, + SuccessThreshold: 1, + FailureThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: tsURL.Hostname(), + Port: intstr.FromString(tsURL.Port()), + Scheme: corev1.URISchemeHTTP, + }, + }, + }, { + PeriodSeconds: 1, + TimeoutSeconds: 5, + SuccessThreshold: 1, + FailureThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Host: tsURL2.Hostname(), + Port: intstr.FromString(tsURL2.Port()), + Scheme: corev1.URISchemeHTTP, + }, + }, + }}) + + if pb.ProbeContainer() { + t.Error("Probe succeeded. Expected failure.") + } +} + func TestKnHTTPSuccessWithRetry(t *testing.T) { var count atomic.Int32 tsURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { @@ -378,7 +495,7 @@ func TestKnHTTPSuccessWithRetry(t *testing.T) { w.WriteHeader(http.StatusOK) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 0, TimeoutSeconds: 0, SuccessThreshold: 1, @@ -390,7 +507,7 @@ func TestKnHTTPSuccessWithRetry(t *testing.T) { Scheme: corev1.URISchemeHTTP, }, }, - }) + }}) if !pb.ProbeContainer() { t.Error("Probe failed. Expected success after retry.") @@ -406,7 +523,7 @@ func TestKnHTTPSuccessWithThreshold(t *testing.T) { w.WriteHeader(http.StatusOK) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 0, TimeoutSeconds: 0, SuccessThreshold: threshold, @@ -418,7 +535,7 @@ func TestKnHTTPSuccessWithThreshold(t *testing.T) { Scheme: corev1.URISchemeHTTP, }, }, - }) + }}) if !pb.ProbeContainer() { t.Error("Expected success after second attempt.") @@ -443,7 +560,7 @@ func TestKnHTTPSuccessWithThresholdAndFailure(t *testing.T) { w.WriteHeader(http.StatusOK) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 0, TimeoutSeconds: 0, SuccessThreshold: threshold, @@ -459,7 +576,7 @@ func TestKnHTTPSuccessWithThresholdAndFailure(t *testing.T) { Scheme: corev1.URISchemeHTTP, }, }, - }) + }}) if !pb.ProbeContainer() { t.Error("Expected success.") @@ -480,7 +597,7 @@ func TestKnHTTPTimeoutFailure(t *testing.T) { w.WriteHeader(http.StatusOK) }) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{&corev1.Probe{ PeriodSeconds: 0, TimeoutSeconds: 0, SuccessThreshold: 1, @@ -492,8 +609,8 @@ func TestKnHTTPTimeoutFailure(t *testing.T) { Scheme: corev1.URISchemeHTTP, }, }, - }) - pb.pollTimeout = retryInterval + }}) + pb.probes[0].pollTimeout = retryInterval var logs bytes.Buffer pb.out = &logs @@ -510,7 +627,7 @@ func TestKnTCPProbeSuccess(t *testing.T) { defer listener.Close() addr := listener.Addr().(*net.TCPAddr) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 0, TimeoutSeconds: 0, SuccessThreshold: 1, @@ -518,10 +635,10 @@ func TestKnTCPProbeSuccess(t *testing.T) { ProbeHandler: corev1.ProbeHandler{ TCPSocket: &corev1.TCPSocketAction{ Host: "127.0.0.1", - Port: intstr.FromInt(addr.Port), + Port: intstr.FromInt32(int32(addr.Port)), }, }, - }) + }}) if !pb.ProbeContainer() { t.Error("Got probe error. Wanted success.") @@ -529,13 +646,13 @@ func TestKnTCPProbeSuccess(t *testing.T) { } func TestKnUnimplementedProbe(t *testing.T) { - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 0, TimeoutSeconds: 0, SuccessThreshold: 1, FailureThreshold: 0, ProbeHandler: corev1.ProbeHandler{}, - }) + }}) if pb.ProbeContainer() { t.Error("Got probe success. Wanted failure.") @@ -543,7 +660,7 @@ func TestKnUnimplementedProbe(t *testing.T) { } func TestKnTCPProbeFailure(t *testing.T) { - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 0, TimeoutSeconds: 0, SuccessThreshold: 1, @@ -554,8 +671,8 @@ func TestKnTCPProbeFailure(t *testing.T) { Port: intstr.FromInt(12345), }, }, - }) - pb.pollTimeout = retryInterval + }}) + pb.probes[0].pollTimeout = retryInterval var logs bytes.Buffer pb.out = &logs @@ -572,7 +689,7 @@ func TestKnTCPProbeSuccessWithThreshold(t *testing.T) { defer listener.Close() addr := listener.Addr().(*net.TCPAddr) - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 0, TimeoutSeconds: 0, SuccessThreshold: 3, @@ -580,16 +697,16 @@ func TestKnTCPProbeSuccessWithThreshold(t *testing.T) { ProbeHandler: corev1.ProbeHandler{ TCPSocket: &corev1.TCPSocketAction{ Host: "127.0.0.1", - Port: intstr.FromInt(addr.Port), + Port: intstr.FromInt32(int32(addr.Port)), }, }, - }) + }}) if !pb.ProbeContainer() { t.Error("Got probe error. Wanted success.") } - if got := pb.count; got < 3 { + if got := pb.probes[0].count; got < 3 { t.Errorf("Count = %d, want: 3", got) } } @@ -602,7 +719,7 @@ func TestKnTCPProbeSuccessThresholdIncludesFailure(t *testing.T) { addr := listener.Addr().(*net.TCPAddr) var successThreshold int32 = 3 - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{&corev1.Probe{ PeriodSeconds: 0, TimeoutSeconds: 0, SuccessThreshold: successThreshold, @@ -610,10 +727,10 @@ func TestKnTCPProbeSuccessThresholdIncludesFailure(t *testing.T) { ProbeHandler: corev1.ProbeHandler{ TCPSocket: &corev1.TCPSocketAction{ Host: "127.0.0.1", - Port: intstr.FromInt(addr.Port), + Port: intstr.FromInt32(int32(addr.Port)), }, }, - }) + }}) connCount := 0 const desiredConnCount = 4 // 1 conn from 1st server, 3 from 2nd server @@ -653,7 +770,7 @@ func TestKnTCPProbeSuccessThresholdIncludesFailure(t *testing.T) { if probeErr := <-errChan; !probeErr { t.Error("Wanted ProbeContainer() to succeed, but got error") } - if got := pb.count; got < successThreshold { + if got := pb.probes[0].count; got < successThreshold { t.Errorf("Count = %d, want: %d", got, successThreshold) } } @@ -675,7 +792,7 @@ func TestGRPCSuccess(t *testing.T) { }() assignedPort := lis.Addr().(*net.TCPAddr).Port - pb := NewProbe(&corev1.Probe{ + pb := NewProbe([]*corev1.Probe{{ PeriodSeconds: 1, TimeoutSeconds: 5, SuccessThreshold: 1, @@ -686,7 +803,7 @@ func TestGRPCSuccess(t *testing.T) { Service: nil, }, }, - }) + }}) if !pb.ProbeContainer() { t.Error("Probe failed. Expected success.") diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 7d64c3511b09..b8d035c1081e 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -87,8 +87,9 @@ type config struct { ServingReadinessProbe string `split_words:"true"` // optional EnableProfiling bool `split_words:"true"` // optional // See https://github.com/knative/serving/issues/12387 - EnableHTTPFullDuplex bool `split_words:"true"` // optional - EnableHTTP2AutoDetection bool `envconfig:"ENABLE_HTTP2_AUTO_DETECTION"` // optional + EnableHTTPFullDuplex bool `split_words:"true"` // optional + EnableHTTP2AutoDetection bool `envconfig:"ENABLE_HTTP2_AUTO_DETECTION"` // optional + EnableMultiContainerProbes bool `split_words:"true"` // Logging configuration ServingLoggingConfig string `split_words:"true" required:"true"` @@ -227,7 +228,7 @@ func Main(opts ...Option) error { // Setup probe to run for checking user-application healthiness. probe := func() bool { return true } if env.ServingReadinessProbe != "" { - probe = buildProbe(logger, env.ServingReadinessProbe, env.EnableHTTP2AutoDetection).ProbeContainer + probe = buildProbe(logger, env.ServingReadinessProbe, env.EnableHTTP2AutoDetection, env.EnableMultiContainerProbes).ProbeContainer } // Enable TLS when certificate is mounted. @@ -331,15 +332,15 @@ func exists(logger *zap.SugaredLogger, filename string) bool { return err == nil } -func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 bool) *readiness.Probe { - coreProbe, err := readiness.DecodeProbe(encodedProbe) +func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 bool, multiContainerProbes bool) *readiness.Probe { + coreProbes, err := readiness.DecodeProbes(encodedProbe, multiContainerProbes) if err != nil { logger.Fatalw("Queue container failed to parse readiness probe", zap.Error(err)) } if autodetectHTTP2 { - return readiness.NewProbeWithHTTP2AutoDetection(coreProbe) + return readiness.NewProbeWithHTTP2AutoDetection(coreProbes) } - return readiness.NewProbe(coreProbe) + return readiness.NewProbe(coreProbes) } func buildTransport(env config) http.RoundTripper { diff --git a/pkg/reconciler/revision/resources/deploy.go b/pkg/reconciler/revision/resources/deploy.go index 4dddee1b94ad..6a218052c398 100644 --- a/pkg/reconciler/revision/resources/deploy.go +++ b/pkg/reconciler/revision/resources/deploy.go @@ -139,13 +139,12 @@ func certVolume(secret string) corev1.Volume { } } -func rewriteUserProbe(p *corev1.Probe, userPort int) { +func addLivenessProbeHeader(p *corev1.Probe) { if p == nil { return } switch { case p.HTTPGet != nil: - p.HTTPGet.Port = intstr.FromInt(userPort) // With mTLS enabled, Istio rewrites probes, but doesn't spoof the kubelet // user agent, so we need to inject an extra header to be able to distinguish // between probes and real requests. @@ -153,6 +152,16 @@ func rewriteUserProbe(p *corev1.Probe, userPort int) { Name: netheader.KubeletProbeKey, Value: queue.Name, }) + } +} + +func rewriteUserLivenessProbe(p *corev1.Probe, userPort int) { + if p == nil { + return + } + switch { + case p.HTTPGet != nil: + p.HTTPGet.Port = intstr.FromInt(userPort) case p.TCPSocket != nil: p.TCPSocket.Port = intstr.FromInt(userPort) } @@ -256,6 +265,19 @@ func makeContainer(container corev1.Container, rev *v1.Revision) corev1.Containe if container.TerminationMessagePolicy == "" { container.TerminationMessagePolicy = corev1.TerminationMessageFallbackToLogsOnError } + + if container.ReadinessProbe != nil { + if container.ReadinessProbe.HTTPGet != nil || container.ReadinessProbe.TCPSocket != nil || container.ReadinessProbe.GRPC != nil { + // HTTP, TCP and gRPC ReadinessProbes are executed by the queue-proxy directly against the + // container instead of via kubelet. + container.ReadinessProbe = nil + } + } + + if container.LivenessProbe != nil { + addLivenessProbeHeader(container.LivenessProbe) + } + return container } @@ -266,15 +288,8 @@ func makeServingContainer(servingContainer corev1.Container, rev *v1.Revision) c servingContainer.Ports = buildContainerPorts(userPort) servingContainer.Env = append(servingContainer.Env, buildUserPortEnv(userPortStr)) container := makeContainer(servingContainer, rev) - if container.ReadinessProbe != nil { - if container.ReadinessProbe.HTTPGet != nil || container.ReadinessProbe.TCPSocket != nil || container.ReadinessProbe.GRPC != nil { - // HTTP, TCP and gRPC ReadinessProbes are executed by the queue-proxy directly against the - // user-container instead of via kubelet. - container.ReadinessProbe = nil - } - } - // If the client provides probes, we should fill in the port for them. - rewriteUserProbe(container.LivenessProbe, int(userPort)) + // If the user provides a liveness probe, we should rewrite in the port on the user-container for them. + rewriteUserLivenessProbe(container.LivenessProbe, int(userPort)) return container } diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index db80265161b1..7613079b9e69 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -194,6 +194,9 @@ var ( }, { Name: "ROOT_CA", Value: "", + }, { + Name: "ENABLE_MULTI_CONTAINER_PROBES", + Value: "false", }}, } @@ -1373,6 +1376,43 @@ func TestMakePodSpec(t *testing.T) { ), }, ), + }, { + name: "with multiple containers with readiness probes", + rev: revision("bar", "foo", + withContainers([]corev1.Container{{ + Name: servingContainerName, + Image: "busybox", + Ports: buildContainerPorts(v1.DefaultUserPort), + ReadinessProbe: withHTTPReadinessProbe(v1.DefaultUserPort), + }, { + Name: sidecarContainerName, + Image: "Ubuntu", + ReadinessProbe: withHTTPReadinessProbe(8090), + }}), + WithContainerStatuses([]v1.ContainerStatus{{ + ImageDigest: "busybox@sha256:deadbeef", + }, { + ImageDigest: "ubuntu@sha256:deadbffe", + }}), + ), + fc: apicfg.Features{ + MultiContainerProbing: apicfg.Enabled, + }, + want: podSpec( + []corev1.Container{ + servingContainer(func(container *corev1.Container) { + container.Image = "busybox@sha256:deadbeef" + }), + sidecarContainer(sidecarContainerName, + func(container *corev1.Container) { + container.Image = "ubuntu@sha256:deadbffe" + }, + ), + queueContainer( + withEnvVar("ENABLE_MULTI_CONTAINER_PROBES", "true"), + withEnvVar("SERVING_READINESS_PROBE", `[{"httpGet":{"path":"/","port":8080,"host":"127.0.0.1","scheme":"HTTP","httpHeaders":[{"name":"K-Kubelet-Probe","value":"queue"}]}},{"httpGet":{"path":"/","port":8090,"host":"127.0.0.1","scheme":"HTTP","httpHeaders":[{"name":"K-Kubelet-Probe","value":"queue"}]}}]`), + ), + }), }} for _, test := range tests { diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index 41ac45deae24..f437bb7243e7 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -257,42 +257,36 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container } ports = append(ports, servingPort, queueHTTPSPort) - container := rev.Spec.GetContainer() - - var httpProbe, execProbe *corev1.Probe - var userProbeJSON string - if container.ReadinessProbe != nil { + // User container (and queue-proxy) readiness probe + userContainer := rev.Spec.GetContainer() + var queueProxyReadinessProbe, userContainerReadinessProbe *corev1.Probe + if userContainer.ReadinessProbe != nil { probePort := userPort - if container.ReadinessProbe.HTTPGet != nil && container.ReadinessProbe.HTTPGet.Port.IntValue() != 0 { - probePort = container.ReadinessProbe.HTTPGet.Port.IntVal + if userContainer.ReadinessProbe.HTTPGet != nil && userContainer.ReadinessProbe.HTTPGet.Port.IntValue() != 0 { + probePort = userContainer.ReadinessProbe.HTTPGet.Port.IntVal } - if container.ReadinessProbe.TCPSocket != nil && container.ReadinessProbe.TCPSocket.Port.IntValue() != 0 { - probePort = container.ReadinessProbe.TCPSocket.Port.IntVal + if userContainer.ReadinessProbe.TCPSocket != nil && userContainer.ReadinessProbe.TCPSocket.Port.IntValue() != 0 { + probePort = userContainer.ReadinessProbe.TCPSocket.Port.IntVal } - if container.ReadinessProbe.GRPC != nil && container.ReadinessProbe.GRPC.Port > 0 { - probePort = container.ReadinessProbe.GRPC.Port + if userContainer.ReadinessProbe.GRPC != nil && userContainer.ReadinessProbe.GRPC.Port > 0 { + probePort = userContainer.ReadinessProbe.GRPC.Port } + // The activator attempts to detect readiness itself by checking the Queue // Proxy's health endpoint rather than waiting for Kubernetes to check and - // propagate the Ready state. We encode the original probe as JSON in an + // propagate the Ready state. We encode the original readiness probes as JSON in an // environment variable for this health endpoint to use. - userProbe := container.ReadinessProbe.DeepCopy() - applyReadinessProbeDefaultsForExec(userProbe, probePort) - - var err error - userProbeJSON, err = readiness.EncodeProbe(userProbe) - if err != nil { - return nil, fmt.Errorf("failed to serialize readiness probe: %w", err) - } + userContainerReadinessProbe = userContainer.ReadinessProbe.DeepCopy() + applyReadinessProbeDefaults(userContainerReadinessProbe, probePort) // After startup we'll directly use the same http health check endpoint the // execprobe would have used (which will then check the user container). // Unlike the StartupProbe, we don't need to override any of the other settings // except period here. See below. - httpProbe = container.ReadinessProbe.DeepCopy() - httpProbe.ProbeHandler = corev1.ProbeHandler{ + queueProxyReadinessProbe = userContainer.ReadinessProbe.DeepCopy() + queueProxyReadinessProbe.ProbeHandler = corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ - Port: intstr.FromInt(int(servingPort.ContainerPort)), + Port: intstr.FromInt32(servingPort.ContainerPort), HTTPHeaders: []corev1.HTTPHeader{{ Name: netheader.ProbeKey, Value: queue.Name, @@ -301,16 +295,56 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container } } + // Sidecar readiness probes + multiContainerProbingEnabled := cfg.Features.MultiContainerProbing == apicfg.Enabled + readinessProbes := []*corev1.Probe{userContainerReadinessProbe} + if multiContainerProbingEnabled { + for _, sc := range rev.Spec.GetSidecarContainers() { + if sc.ReadinessProbe != nil { + var probePort int32 + switch { + case sc.ReadinessProbe.HTTPGet != nil && sc.ReadinessProbe.HTTPGet.Port.IntValue() != 0: + probePort = sc.ReadinessProbe.HTTPGet.Port.IntVal + case sc.ReadinessProbe.TCPSocket != nil && sc.ReadinessProbe.TCPSocket.Port.IntValue() != 0: + probePort = sc.ReadinessProbe.TCPSocket.Port.IntVal + case sc.ReadinessProbe.GRPC != nil && sc.ReadinessProbe.GRPC.Port > 0: + probePort = sc.ReadinessProbe.GRPC.Port + default: + return nil, fmt.Errorf("sidecar readiness probe does not define probe port on container: %s", sc.Name) + } + scProbe := sc.ReadinessProbe.DeepCopy() + applyReadinessProbeDefaults(scProbe, probePort) + readinessProbes = append(readinessProbes, scProbe) + } + } + } + + // encode the readiness probe(s) + var readinessProbeJSON string + var err error + if multiContainerProbingEnabled && readinessProbes != nil && len(readinessProbes) > 0 { + readinessProbeJSON, err = readiness.EncodeMultipleProbes(readinessProbes) + if err != nil { + return nil, fmt.Errorf("failed to serialize multiple readiness probes: %w", err) + } + + } else if userContainerReadinessProbe != nil { + readinessProbeJSON, err = readiness.EncodeSingleProbe(userContainerReadinessProbe) + if err != nil { + return nil, fmt.Errorf("failed to serialize single readiness probe: %w", err) + } + } + fullDuplexFeature, fullDuplexExists := rev.Annotations[apicfg.AllowHTTPFullDuplexFeatureKey] useQPResourceDefaults := cfg.Features.QueueProxyResourceDefaults == apicfg.Enabled c := &corev1.Container{ Name: QueueContainerName, Image: cfg.Deployment.QueueSidecarImage, - Resources: createQueueResources(cfg.Deployment, rev.GetAnnotations(), container, useQPResourceDefaults), + Resources: createQueueResources(cfg.Deployment, rev.GetAnnotations(), userContainer, useQPResourceDefaults), Ports: ports, - StartupProbe: execProbe, - ReadinessProbe: httpProbe, + StartupProbe: nil, + ReadinessProbe: queueProxyReadinessProbe, SecurityContext: queueSecurityContext, Env: []corev1.EnvVar{{ Name: "SERVING_NAMESPACE", @@ -397,7 +431,7 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container Value: metrics.Domain(), }, { Name: "SERVING_READINESS_PROBE", - Value: userProbeJSON, + Value: readinessProbeJSON, }, { Name: "ENABLE_PROFILING", Value: strconv.FormatBool(cfg.Observability.EnableProfiling), @@ -424,19 +458,22 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container }, { Name: "ROOT_CA", Value: cfg.Deployment.QueueSidecarRootCA, + }, { + Name: "ENABLE_MULTI_CONTAINER_PROBES", + Value: strconv.FormatBool(multiContainerProbingEnabled), }}, } return c, nil } -func applyReadinessProbeDefaultsForExec(p *corev1.Probe, port int32) { +func applyReadinessProbeDefaults(p *corev1.Probe, port int32) { switch { case p == nil: return case p.HTTPGet != nil: p.HTTPGet.Host = localAddress - p.HTTPGet.Port = intstr.FromInt(int(port)) + p.HTTPGet.Port = intstr.FromInt32(port) if p.HTTPGet.Scheme == "" { p.HTTPGet.Scheme = corev1.URISchemeHTTP @@ -448,13 +485,13 @@ func applyReadinessProbeDefaultsForExec(p *corev1.Probe, port int32) { }) case p.TCPSocket != nil: p.TCPSocket.Host = localAddress - p.TCPSocket.Port = intstr.FromInt(int(port)) + p.TCPSocket.Port = intstr.FromInt32(port) case p.Exec != nil: - // User-defined ExecProbe will still be run on user-container. + // User-defined ExecProbe will still be run on user/sidecar-container. // Use TCP probe in queue-proxy. p.TCPSocket = &corev1.TCPSocketAction{ Host: localAddress, - Port: intstr.FromInt(int(port)), + Port: intstr.FromInt32(port), } p.Exec = nil case p.GRPC != nil: diff --git a/pkg/reconciler/revision/resources/queue_test.go b/pkg/reconciler/revision/resources/queue_test.go index e3c72d1fbb60..08084408b17b 100644 --- a/pkg/reconciler/revision/resources/queue_test.go +++ b/pkg/reconciler/revision/resources/queue_test.go @@ -427,6 +427,20 @@ func TestMakeQueueContainer(t *testing.T) { "ENABLE_HTTP2_AUTO_DETECTION": "false", }) }), + }, { + name: "multi container probing enabled", + rev: revision("bar", "foo", withContainers(containers)), + fc: apicfg.Features{ + MultiContainerProbing: apicfg.Enabled, + }, + dc: deployment.Config{ + ProgressDeadline: 0 * time.Second, + }, + want: queueContainer(func(c *corev1.Container) { + c.Env = env(map[string]string{ + "ENABLE_MULTI_CONTAINER_PROBES": "true", + }) + }), }} for _, test := range tests { @@ -453,9 +467,13 @@ func TestMakeQueueContainer(t *testing.T) { t.Fatal("makeQueueContainer returned error:", err) } + expectedProbe := probeJSON(test.rev.Spec.GetContainer()) + if test.fc.MultiContainerProbing == apicfg.Enabled { + expectedProbe = "[" + expectedProbe + "]" + } test.want.Env = append(test.want.Env, corev1.EnvVar{ Name: "SERVING_READINESS_PROBE", - Value: probeJSON(test.rev.Spec.GetContainer()), + Value: expectedProbe, }) sortEnv(got.Env) @@ -1078,6 +1096,7 @@ var defaultEnv = map[string]string{ "TRACING_CONFIG_ZIPKIN_ENDPOINT": "", "USER_PORT": strconv.Itoa(v1.DefaultUserPort), "ROOT_CA": "", + "ENABLE_MULTI_CONTAINER_PROBES": "false", } func probeJSON(container *corev1.Container) string { diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 80a6137559c8..6dcb574642ec 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -115,6 +115,11 @@ toggle_feature kubernetes.podspec-init-containers Enabled go_test_e2e -timeout=2m ./test/e2e/initcontainers ${E2E_TEST_FLAGS} || failed=1 toggle_feature kubernetes.podspec-init-containers Disabled +# Run multi-container probe tests +toggle_feature multi-container-probing Enabled +go_test_e2e -timeout=2m ./test/e2e/multicontainerprobing ${E2E_TEST_FLAGS} || failed=1 +toggle_feature multi-container-probing Disabled + # RUN PVC tests with default storage class. toggle_feature kubernetes.podspec-persistent-volume-claim Enabled toggle_feature kubernetes.podspec-persistent-volume-write Enabled diff --git a/test/e2e/multicontainerprobing/multicontainer_readiness_test.go b/test/e2e/multicontainerprobing/multicontainer_readiness_test.go new file mode 100644 index 000000000000..420ccea38bd1 --- /dev/null +++ b/test/e2e/multicontainerprobing/multicontainer_readiness_test.go @@ -0,0 +1,97 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multicontainerprobing + +import ( + "context" + "testing" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + pkgTest "knative.dev/pkg/test" + "knative.dev/pkg/test/spoof" + v1 "knative.dev/serving/pkg/apis/serving/v1" + "knative.dev/serving/test" + v1test "knative.dev/serving/test/v1" +) + +func TestMultiContainerReadiness(t *testing.T) { + t.Parallel() + + clients := test.Setup(t) + + names := test.ResourceNames{ + Service: test.ObjectNameForTest(t), + Image: test.ServingContainer, + Sidecars: []string{ + test.SidecarContainer, + }, + } + + containers := []corev1.Container{ + { + Image: pkgTest.ImagePath(names.Image), + Ports: []corev1.ContainerPort{{ + ContainerPort: 8881, + }}, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt32(8881), + }}, + }, + }, { + Image: pkgTest.ImagePath(names.Sidecars[0]), + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt32(8882), + }}, + }, + }, + } + + test.EnsureTearDown(t, clients, &names) + + t.Log("Creating a new Service") + + resources, err := v1test.CreateServiceReady(t, clients, &names, func(svc *v1.Service) { + svc.Spec.Template.Spec.Containers = containers + }) + if err != nil { + t.Fatalf("Failed to create initial Service: %v: %v", names.Service, err) + } + + url := resources.Route.Status.URL.URL() + if _, err := pkgTest.CheckEndpointState( + context.Background(), + clients.KubeClient, + t.Logf, + url, + spoof.MatchesAllOf(spoof.IsStatusOK, spoof.MatchesBody(test.MultiContainerResponse)), + "MulticontainerServesExpectedText", + test.ServingFlags.ResolvableDomain, + test.AddRootCAtoTransport(context.Background(), t.Logf, clients, test.ServingFlags.HTTPS), + ); err != nil { + t.Fatalf("The endpoint %s for Route %s didn't serve the expected text %q: %v", url, names.Route, test.MultiContainerResponse, err) + } +}