Skip to content

Commit

Permalink
operator v1: normalize cluster config before drift detection comparison
Browse files Browse the repository at this point in the history
cluster controller normalizes configs before sending them to admin-api,
when writing.
this patch makes use of the normalization when comparing for drift
detection, to avoid unexpected drifts, for example if the CRD sets
spec.additionalProperties.key to "null" (string).
  • Loading branch information
birdayz authored and RafalKorepta committed Nov 21, 2024
1 parent b457ca1 commit 6e6f3c6
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,17 @@ func (r *ClusterConfigurationDriftReconciler) Reconcile(
func hasDrift(log logr.Logger, desired, actual map[string]any, schema map[string]rpadmin.ConfigPropertyMetadata) (configuration.CentralConfigurationPatch, bool) {
// Make copy of desired, actual, so callers not surprised that items are removed by this function.
copiedDesired := make(map[string]any)

for k, v := range desired {
s := schema[k]
v := v

// Before sending cluster properties to admin-api, cluster controller "sanitizes" them.
// Do the same for drift detection.
copiedDesired[k] = configuration.ParseConfigValueBeforeUpsert(log, v, &s)
}

copiedActual := make(map[string]any)
maps.Copy(copiedDesired, desired)
maps.Copy(copiedActual, actual)

for k, v := range schema { //nolint:gocritic // ignore rangeValCopy - this is the type returned by Admin API client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,93 @@
package vectorized

import (
"context"
"fmt"
"testing"

"github.com/go-logr/logr"
"github.com/go-logr/logr/testr"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/redpanda"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func TestDiffIntegration(t *testing.T) {
const user = "syncer"
const password = "password"
const saslMechanism = "SCRAM-SHA-256"

ctx := context.Background()
logger := testr.New(t)
ctx = log.IntoContext(ctx, logger)

// No auth is easy, only test on a cluster with auth on admin API.
container, err := redpanda.Run(
ctx,
"docker.redpanda.com/redpandadata/redpanda:v24.2.4",
// TODO: Upgrade to testcontainers 0.33.0 so we get
// WithBootstrapConfig. For whatever reason, it seems to not get along
// with CI.
// redpanda.WithBootstrapConfig("admin_api_require_auth", true),
redpanda.WithSuperusers("syncer"),
testcontainers.WithEnv(map[string]string{
"RP_BOOTSTRAP_USER": fmt.Sprintf("%s:%s:%s", user, password, saslMechanism),
}),
)
require.NoError(t, err)
defer func() {
_ = container.Stop(ctx, nil)
}()

// Configure the same environment as we'll be executed in within the helm
// chart:
// https://github.com/redpanda-data/helm-charts/commit/081c08b6b83ba196994ec3312a7c6011e4ef0a22#diff-84c6555620e4e5f79262384a9fa3e8f4876b36bb3a64748cbd8fbdcb66e8c1b9R966
t.Setenv("RPK_USER", user)
t.Setenv("RPK_PASS", password)
t.Setenv("RPK_SASL_MECHANISM", saslMechanism)

adminAPIAddr, err := container.AdminAPIAddress(ctx)
require.NoError(t, err)

adminAPIClient, err := rpadmin.NewAdminAPI([]string{adminAPIAddr}, &rpadmin.BasicAuth{
Username: user,
Password: password,
}, nil)
require.NoError(t, err)

_, err = adminAPIClient.PatchClusterConfig(ctx, map[string]any{
"kafka_rpc_server_tcp_send_buf": 102400,
}, []string{})
require.NoError(t, err)

schema, err := adminAPIClient.ClusterConfigSchema(ctx)
require.NoError(t, err)

config, err := adminAPIClient.Config(ctx, true)
require.NoError(t, err)
require.Equal(t, config["kafka_rpc_server_tcp_send_buf"].(float64), float64(102400))

_, drift := hasDrift(logr.Discard(), map[string]any{
"kafka_rpc_server_tcp_send_buf": "null",
}, config, schema)
require.True(t, drift, `expecting to see a drift between integer in redpanda and "null" in desired configuration`)

_, err = adminAPIClient.PatchClusterConfig(ctx, map[string]any{
"kafka_rpc_server_tcp_send_buf": nil,
}, []string{})
require.NoError(t, err)

config, err = adminAPIClient.Config(ctx, true)
require.NoError(t, err)

_, drift = hasDrift(logr.Discard(), map[string]any{
"kafka_rpc_server_tcp_send_buf": "null",
}, config, schema)
require.False(t, drift, `shall have no drift if we compare null against "null"`)
}

func TestDiffWithNull(t *testing.T) {
assert := require.New(t)

Expand Down
5 changes: 3 additions & 2 deletions operator/pkg/resources/configuration/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func ThreeWayMerge(
for k, v := range apply {
if oldValue, ok := current[k]; !ok || !PropertiesEqual(log, v, oldValue, schema[k]) {
metadata := schema[k]
patch.Upsert[k] = parseConfigValueBeforeUpsert(log, v, &metadata)
patch.Upsert[k] = ParseConfigValueBeforeUpsert(log, v, &metadata)
}
}
invalidSet := make(map[string]bool, len(invalidProperties))
Expand All @@ -99,7 +99,7 @@ func ThreeWayMerge(
return patch
}

func parseConfigValueBeforeUpsert(log logr.Logger, value interface{}, metadata *rpadmin.ConfigPropertyMetadata) interface{} {
func ParseConfigValueBeforeUpsert(log logr.Logger, value interface{}, metadata *rpadmin.ConfigPropertyMetadata) interface{} {
tempValue := fmt.Sprintf("%v", value)

//nolint:gocritic // no need to be a switch case
Expand Down Expand Up @@ -145,6 +145,7 @@ func PropertiesEqual(
l logr.Logger, v1, v2 interface{}, metadata rpadmin.ConfigPropertyMetadata,
) bool {
log := l.WithName("PropertiesEqual")

switch metadata.Type {
case "number":
if f1, f2, ok := bothFloat64(v1, v2); ok {
Expand Down

0 comments on commit 6e6f3c6

Please sign in to comment.