diff --git a/go.mod b/go.mod index 55e004af2..5426022be 100644 --- a/go.mod +++ b/go.mod @@ -90,4 +90,4 @@ replace k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.21.4 // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet. -replace github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 +replace github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 diff --git a/go.sum b/go.sum index 2a9ac7c15..0ff9f1cce 100644 --- a/go.sum +++ b/go.sum @@ -159,8 +159,8 @@ github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 h1:PgEQkGHR4YimSCEGT5IoswN9gJKZDVskf+he6UClCLw= -github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= +github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 h1:/NipyHnOmvRsVzj81j2qE0VxsvsqhOB0f4vJIhk2qCQ= +github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg= github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xCzHAvxcr8HZnzsqU6ILg/0NiiE= diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 73b9c52cc..87d0485e5 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -140,6 +140,9 @@ type KVConfig struct { AdvertiseAddr string `yaml:"advertise_addr"` AdvertisePort int `yaml:"advertise_port"` + ClusterLabel string `yaml:"cluster_label" category:"experimental"` + ClusterLabelVerificationDisabled bool `yaml:"cluster_label_verification_disabled" category:"experimental"` + // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"` @@ -193,6 +196,8 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") + f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") + f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.") cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix) } @@ -399,12 +404,14 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { mlCfg.AdvertiseAddr = m.cfg.AdvertiseAddr mlCfg.AdvertisePort = m.cfg.AdvertisePort + mlCfg.Label = m.cfg.ClusterLabel + mlCfg.SkipInboundLabelCheck = m.cfg.ClusterLabelVerificationDisabled + if m.cfg.NodeName != "" { mlCfg.Name = m.cfg.NodeName } if m.cfg.RandomizeNodeName { mlCfg.Name = mlCfg.Name + "-" + generateRandomSuffix(m.logger) - level.Info(m.logger).Log("msg", "Using memberlist cluster node name", "name", mlCfg.Name) } mlCfg.LogOutput = newMemberlistLoggerAdapter(m.logger, false) @@ -420,6 +427,8 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { mlCfg.ProbeInterval = 5 * time.Second // Probe a random node every this interval. This setting is also the total timeout for the direct + indirect probes. mlCfg.ProbeTimeout = 2 * time.Second // Timeout for the direct probe. + level.Info(m.logger).Log("msg", "Using memberlist cluster label %q and node name %q", mlCfg.Label, mlCfg.Name) + return mlCfg, nil } diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 2daf89dee..b375cb3d4 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -209,16 +209,11 @@ func getData(t *testing.T, kv *Client, key string) *data { return nil } -func cas(t *testing.T, kv *Client, key string, updateFn func(*data) (*data, bool, error)) { - t.Helper() - - if err := casWithErr(context.Background(), t, kv, key, updateFn); err != nil { - t.Fatal(err) - } +func cas(kv *Client, key string, updateFn func(*data) (*data, bool, error)) error { + return casWithErr(context.Background(), kv, key, updateFn) } -func casWithErr(ctx context.Context, t *testing.T, kv *Client, key string, updateFn func(*data) (*data, bool, error)) error { - t.Helper() +func casWithErr(ctx context.Context, kv *Client, key string, updateFn func(*data) (*data, bool, error)) error { fn := func(in interface{}) (out interface{}, retry bool, err error) { var r *data if in != nil { @@ -262,7 +257,8 @@ func TestBasicGetAndCas(t *testing.T) { } // Create member in PENDING state, with some tokens - cas(t, kv, key, updateFn(name)) + err = cas(kv, key, updateFn(name)) + require.NoError(t, err) r := getData(t, kv, key) if r == nil || r.Members[name].Timestamp == 0 || len(r.Members[name].Tokens) <= 0 { @@ -275,17 +271,20 @@ func TestBasicGetAndCas(t *testing.T) { } // Update member into ACTIVE state - cas(t, kv, key, updateFn(name)) + err = cas(kv, key, updateFn(name)) + require.NoError(t, err) + r = getData(t, kv, key) if r.Members[name].State != ACTIVE { t.Errorf("Expected member to be active after second update, got %v", r) } // Delete member - cas(t, kv, key, func(r *data) (*data, bool, error) { + err = cas(kv, key, func(r *data) (*data, bool, error) { delete(r.Members, name) return r, true, nil }) + require.NoError(t, err) r = getData(t, kv, key) if r.Members[name].State != LEFT { @@ -317,10 +316,11 @@ func TestCASNoOutput(t *testing.T) { withFixtures(t, func(t *testing.T, kv *Client) { // should succeed with single call calls := 0 - cas(t, kv, key, func(d *data) (*data, bool, error) { + err := cas(kv, key, func(d *data) (*data, bool, error) { calls++ return nil, true, nil }) + require.NoError(t, err) require.Equal(t, 1, calls) }) @@ -329,7 +329,7 @@ func TestCASNoOutput(t *testing.T) { func TestCASErrorNoRetry(t *testing.T) { withFixtures(t, func(t *testing.T, kv *Client) { calls := 0 - err := casWithErr(context.Background(), t, kv, key, func(d *data) (*data, bool, error) { + err := casWithErr(context.Background(), kv, key, func(d *data) (*data, bool, error) { calls++ return nil, false, errors.New("don't worry, be happy") }) @@ -341,7 +341,7 @@ func TestCASErrorNoRetry(t *testing.T) { func TestCASErrorWithRetries(t *testing.T) { withFixtures(t, func(t *testing.T, kv *Client) { calls := 0 - err := casWithErr(context.Background(), t, kv, key, func(d *data) (*data, bool, error) { + err := casWithErr(context.Background(), kv, key, func(d *data) (*data, bool, error) { calls++ return nil, true, errors.New("don't worry, be happy") }) @@ -352,7 +352,7 @@ func TestCASErrorWithRetries(t *testing.T) { func TestCASNoChange(t *testing.T) { withFixtures(t, func(t *testing.T, kv *Client) { - cas(t, kv, key, func(in *data) (*data, bool, error) { + err := cas(kv, key, func(in *data) (*data, bool, error) { if in == nil { in = &data{Members: map[string]member{}} } @@ -365,10 +365,11 @@ func TestCASNoChange(t *testing.T) { return in, true, nil }) + require.NoError(t, err) startTime := time.Now() calls := 0 - err := casWithErr(context.Background(), t, kv, key, func(d *data) (*data, bool, error) { + err = casWithErr(context.Background(), kv, key, func(d *data) (*data, bool, error) { calls++ return d, true, nil }) @@ -381,7 +382,7 @@ func TestCASNoChange(t *testing.T) { func TestCASNoChangeShortTimeout(t *testing.T) { withFixtures(t, func(t *testing.T, kv *Client) { - cas(t, kv, key, func(in *data) (*data, bool, error) { + err := cas(kv, key, func(in *data) (*data, bool, error) { if in == nil { in = &data{Members: map[string]member{}} } @@ -394,12 +395,13 @@ func TestCASNoChangeShortTimeout(t *testing.T) { return in, true, nil }) + require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() calls := 0 - err := casWithErr(ctx, t, kv, key, func(d *data) (*data, bool, error) { + err = casWithErr(ctx, kv, key, func(d *data) (*data, bool, error) { calls++ return d, true, nil }) @@ -410,24 +412,26 @@ func TestCASNoChangeShortTimeout(t *testing.T) { func TestCASFailedBecauseOfVersionChanges(t *testing.T) { withFixtures(t, func(t *testing.T, kv *Client) { - cas(t, kv, key, func(in *data) (*data, bool, error) { + err := cas(kv, key, func(in *data) (*data, bool, error) { return &data{Members: map[string]member{"nonempty": {Timestamp: time.Now().Unix()}}}, true, nil }) + require.NoError(t, err) calls := 0 // outer cas - err := casWithErr(context.Background(), t, kv, key, func(d *data) (*data, bool, error) { + err = casWithErr(context.Background(), kv, key, func(d *data) (*data, bool, error) { // outer CAS logic calls++ // run inner-CAS that succeeds, and that will make outer cas to fail - cas(t, kv, key, func(d *data) (*data, bool, error) { + err = cas(kv, key, func(d *data) (*data, bool, error) { // to avoid delays due to merging, we update different ingester each time. d.Members[fmt.Sprintf("%d", calls)] = member{ Timestamp: time.Now().Unix(), } return d, true, nil }) + require.NoError(t, err) d.Members["world"] = member{ Timestamp: time.Now().Unix(), @@ -467,8 +471,10 @@ func TestMultipleCAS(t *testing.T) { defer wg.Done() <-start up := updateFn(name) - cas(t, kv, "test", up) // JOINING state - cas(t, kv, "test", up) // ACTIVE state + err := cas(kv, "test", up) // JOINING state + require.NoError(t, err) + err = cas(kv, "test", up) // ACTIVE state + require.NoError(t, err) }(fmt.Sprintf(namePattern, i)) } @@ -500,7 +506,8 @@ func TestMultipleCAS(t *testing.T) { delete(in.Members, name) return in, true, nil } - cas(t, kv, "test", up) // PENDING state + err := cas(kv, "test", up) // PENDING state + require.NoError(t, err) }(fmt.Sprintf(namePattern, i)) } @@ -519,34 +526,127 @@ func TestMultipleCAS(t *testing.T) { } } +func defaultKVConfig(i int) KVConfig { + id := fmt.Sprintf("Member-%d", i) + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.NodeName = id + + cfg.GossipInterval = 100 * time.Millisecond + cfg.GossipNodes = 10 + cfg.PushPullInterval = 5 * time.Second + + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, // randomize ports + } + + return cfg +} + func TestMultipleClients(t *testing.T) { - c := dataCodec{} + members := 10 - const members = 10 - const key = "ring" + err := testMultipleClientsWithConfigGenerator(t, members, defaultKVConfig) + require.NoError(t, err) +} - var clients []*Client +func TestMultipleClientsWithMixedLabelsAndExpectFailure(t *testing.T) { + // We want 3 members, they will be configured with the following labels: + // 1) "" + // 2) "label1" + // 3) "label2" + // + // We expect that it won't be possible to build a memberlist cluster with mixed labels. + var membersLabel = []string{ + "", + "label1", + "label2", + } - stop := make(chan struct{}) - start := make(chan struct{}) + configGen := func(i int) KVConfig { + cfg := defaultKVConfig(i) - port := 0 + cfg.ClusterLabelVerificationDisabled = false + cfg.ClusterLabel = membersLabel[i] - for i := 0; i < members; i++ { - id := fmt.Sprintf("Member-%d", i) - var cfg KVConfig - flagext.DefaultValues(&cfg) - cfg.NodeName = id + return cfg + } - cfg.GossipInterval = 100 * time.Millisecond - cfg.GossipNodes = 3 - cfg.PushPullInterval = 5 * time.Second + err := testMultipleClientsWithConfigGenerator(t, len(membersLabel), configGen) + require.Error(t, err) + require.Contains(t, err.Error(), fmt.Sprintf("expected to see at least %d updates, got", len(membersLabel))) +} - cfg.TCPTransport = TCPTransportConfig{ - BindAddrs: []string{"localhost"}, - BindPort: 0, // randomize ports +func TestMultipleClientsWithMixedLabelsAndClusterLabelVerificationDisabled(t *testing.T) { + // We want 3 members, all will have the cluster label verification disabled. + // They will be configured with mixed labels, and some without any labels. + // + // If the disabled verification works as expected then these members + // will be able to form a cluster together. + var membersLabel = []string{ + "", + "label1", + "label2", + } + + configGen := func(i int) KVConfig { + cfg := defaultKVConfig(i) + + cfg.ClusterLabelVerificationDisabled = true + cfg.ClusterLabel = membersLabel[i] + + return cfg + } + + err := testMultipleClientsWithConfigGenerator(t, len(membersLabel), configGen) + require.NoError(t, err) +} + +func TestMultipleClientsWithSameLabelWithClusterLabelVerification(t *testing.T) { + members := 10 + label := "myTestLabel" + + configGen := func(i int) KVConfig { + cfg := defaultKVConfig(i) + + cfg.ClusterLabel = label + + return cfg + } + + err := testMultipleClientsWithConfigGenerator(t, members, configGen) + require.NoError(t, err) +} + +func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen func(memberId int) KVConfig) error { + c := dataCodec{} + const key = "ring" + var clients []*Client + port := 0 + casInterval := time.Second + + var clientWg sync.WaitGroup + clientWg.Add(members) + clientErrCh := make(chan error, members) + getClientErr := func() error { + select { + case err := <-clientErrCh: + return err + default: + return nil } + } + + stop := make(chan struct{}) + defer func() { + close(stop) + clientWg.Wait() + }() + start := make(chan struct{}) + for i := 0; i < members; i++ { + cfg := configGen(i) cfg.Codecs = []codec.Codec{c} mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) @@ -554,24 +654,33 @@ func TestMultipleClients(t *testing.T) { kv, err := NewClient(mkv, c) require.NoError(t, err) - clients = append(clients, kv) - go runClient(t, kv, id, key, port, start, stop) + go func(port int) { + defer clientWg.Done() + + if err := runClient(kv, cfg.NodeName, key, port, casInterval, start, stop); err != nil { + clientErrCh <- err + } + }(port) // Must copy value, otherwise the next iteration might update it before runClient() gets it. // next KV will connect to this one port = kv.kv.GetListeningPort() } - println("Waiting before start") + t.Log("Waiting before start") time.Sleep(2 * time.Second) close(start) - println("Observing ring ...") + if err := getClientErr(); err != nil { + return err + } + + t.Log("Observing ring ...") startTime := time.Now() firstKv := clients[0] - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), casInterval*3/2) // Watch for 1.5 cas intervals. updates := 0 firstKv.WatchKey(ctx, key, func(in interface{}) bool { updates++ @@ -594,7 +703,11 @@ func TestMultipleClients(t *testing.T) { if updates < members { // in general, at least one update from each node. (although that's not necessarily true... // but typically we get more updates than that anyway) - t.Errorf("expected to see updates, got %d", updates) + return fmt.Errorf("expected to see at least %d updates, got %d", members, updates) + } + + if err := getClientErr(); err != nil { + return err } // Let's check all the clients to see if they have relatively up-to-date information @@ -608,13 +721,13 @@ func TestMultipleClients(t *testing.T) { r := getData(t, kv, key) t.Logf("KV %d: number of known members: %d\n", i, len(r.Members)) if len(r.Members) != members { - t.Errorf("Member %d has only %d members in the ring", i, len(r.Members)) + return fmt.Errorf("Member %d has only %d members in the ring", i, len(r.Members)) } minTimestamp, maxTimestamp, avgTimestamp := getTimestamps(r.Members) for n, ing := range r.Members { if ing.State != ACTIVE { - t.Errorf("Member %d: invalid state of member %s in the ring: %v ", i, n, ing.State) + return fmt.Errorf("Member %d: invalid state of member %s in the ring: %v ", i, n, ing.State) } } now := time.Now() @@ -629,20 +742,18 @@ func TestMultipleClients(t *testing.T) { t.Logf("Found tokens: %d", len(allTokens)) } else { if len(allTokens) != len(tokens) { - t.Errorf("Member %d: Expected %d tokens, got %d", i, len(allTokens), len(tokens)) - } else { - for ix, tok := range allTokens { - if tok != tokens[ix] { - t.Errorf("Member %d: Tokens at position %d differ: %v, %v", i, ix, tok, tokens[ix]) - break - } + return fmt.Errorf("Member %d: Expected %d tokens, got %d", i, len(allTokens), len(tokens)) + } + + for ix, tok := range allTokens { + if tok != tokens[ix] { + return fmt.Errorf("Member %d: Tokens at position %d differ: %v, %v", i, ix, tok, tokens[ix]) } } } } - // We cannot shutdown the KV until now in order for Get() to work reliably. - close(stop) + return getClientErr() } func TestJoinMembersWithRetryBackoff(t *testing.T) { @@ -715,7 +826,8 @@ func TestJoinMembersWithRetryBackoff(t *testing.T) { if err != nil { t.Errorf("failed to start KV: %v", err) } - runClient(t, kv, id, key, port, start, stop) + err = runClient(kv, id, key, port, time.Second, start, stop) + require.NoError(t, err) } if i == 0 { @@ -750,7 +862,7 @@ func TestJoinMembersWithRetryBackoff(t *testing.T) { close(stop) if observedMembers < members { - t.Errorf("expected to see %d but saw %d", members, observedMembers) + t.Errorf("expected to see at least %d but saw %d", members, observedMembers) } } @@ -827,7 +939,7 @@ func getTimestamps(members map[string]member) (min int64, max int64, avg int64) return } -func runClient(t *testing.T, kv *Client, name string, ringKey string, portToConnect int, start <-chan struct{}, stop <-chan struct{}) { +func runClient(kv *Client, name string, ringKey string, portToConnect int, casInterval time.Duration, start <-chan struct{}, stop <-chan struct{}) error { // stop gossipping about the ring(s) defer services.StopAndAwaitTerminated(context.Background(), kv.kv) //nolint:errcheck @@ -840,14 +952,16 @@ func runClient(t *testing.T, kv *Client, name string, ringKey string, portToConn if portToConnect > 0 { _, err := kv.kv.JoinMembers([]string{fmt.Sprintf("127.0.0.1:%d", portToConnect)}) if err != nil { - t.Errorf("%s failed to join the cluster: %v", name, err) - return + return fmt.Errorf("%s failed to join the cluster: %f", name, err) } } case <-stop: - return - case <-time.After(1 * time.Second): - cas(t, kv, ringKey, updateFn(name)) + return nil + case <-time.After(casInterval): + err := cas(kv, ringKey, updateFn(name)) + if err != nil { + return fmt.Errorf("failed to cas the ring: %f", err) + } } } }