From f0f0bbe637b18dfd1bf03a54856e5f99ecc8c7f6 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Mon, 30 Mar 2020 18:36:29 +0200 Subject: [PATCH] Fix pushes when ingesters don't have a zone. (#2357) * Fix pushes when ingesters don't have a zone. Signed-off-by: Goutham Veeramachaneni * Added test for nil zone handling Signed-off-by: Goutham Veeramachaneni * Address feedback Signed-off-by: Goutham Veeramachaneni --- integration/backward_compatibility_test.go | 93 +++++++++++++++++++++- pkg/ring/ring.go | 8 +- 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/integration/backward_compatibility_test.go b/integration/backward_compatibility_test.go index 58d5f7781b..e80e9aa20f 100644 --- a/integration/backward_compatibility_test.go +++ b/integration/backward_compatibility_test.go @@ -37,6 +37,14 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) { } } +func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) { + for _, previousImage := range previousVersionImages { + t.Run(fmt.Sprintf("Backward compatibility upgrading from %s", previousImage), func(t *testing.T) { + runNewDistributorsCanPushToOldIngestersWithReplication(t, previousImage) + }) + } +} + func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage string) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) @@ -92,6 +100,87 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s // stopped, which means the transfer to ingester-2 is completed. require.NoError(t, s.Stop(ingester1)) + checkQueries(t, consul, distributor, + expectedVector, + previousImage, + flagsForOldImage, ChunksStorageFlags, + now, + s, + 1, + ) +} + +// Check for issues like https://github.com/cortexproject/cortex/issues/2356 +func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previousImage string) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + dynamo := e2edb.NewDynamoDB() + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(dynamo, consul)) + + flagsForOldImage := mergeFlags(ChunksStorageFlags, map[string]string{ + "-schema-config-file": "", + "-config-yaml": ChunksStorageFlags["-schema-config-file"], + "-distributor.replication-factor": "3", + }) + + flagsForNewImage := mergeFlags(ChunksStorageFlags, map[string]string{ + "-distributor.replication-factor": "3", + }) + + require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) + + // Start Cortex table-manager (running on current version since the backward compatibility + // test is about testing a rolling update of other services). + tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "") + require.NoError(t, s.StartAndWaitReady(tableManager)) + + // Wait until the first table-manager sync has completed, so that we're + // sure the tables have been created. + require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds")) + + // Start other Cortex components (ingester running on previous version). + ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage) + ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage) + ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage) + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flagsForNewImage, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3)) + + // Wait until the distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(1536), "cortex_ring_tokens_total")) + + // Push some series to Cortex. + now := time.Now() + series, expectedVector := generateSeries("series_1", now) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + checkQueries(t, consul, distributor, + expectedVector, + previousImage, + flagsForOldImage, flagsForNewImage, + now, + s, + 3, + ) +} + +func checkQueries(t *testing.T, consul *e2e.HTTPService, distributor *e2ecortex.CortexService, + expectedVector model.Vector, + previousImage string, + flagsForOldImage, flagsForNewImage map[string]string, + now time.Time, + s *e2e.Scenario, + numIngesters int, +) { // Query the new ingester both with the old and the new querier. for _, image := range []string{previousImage, ""} { var querier *e2ecortex.CortexService @@ -99,13 +188,13 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s if image == previousImage { querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flagsForOldImage, image) } else { - querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, image) + querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flagsForNewImage, image) } require.NoError(t, s.StartAndWaitReady(querier)) // Wait until the querier has updated the ring. - require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(numIngesters*512)), "cortex_ring_tokens_total")) // Query the series c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 43a982f1c4..ec9243688e 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -199,11 +199,13 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet if _, ok := distinctHosts[token.Ingester]; ok { continue } - if _, ok := distinctZones[token.Zone]; ok { - continue + if token.Zone != "" { // Ignore if the ingesters don't have a zone set. + if _, ok := distinctZones[token.Zone]; ok { + continue + } + distinctZones[token.Zone] = struct{}{} } distinctHosts[token.Ingester] = struct{}{} - distinctZones[token.Zone] = struct{}{} ingester := r.ringDesc.Ingesters[token.Ingester] // We do not want to Write to Ingesters that are not ACTIVE, but we do want