Skip to content

Commit

Permalink
Fix pushes when ingesters don't have a zone. (cortexproject#2357)
Browse files Browse the repository at this point in the history
* Fix pushes when ingesters don't have a zone.

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Added test for nil zone handling

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Address feedback

Signed-off-by: Goutham Veeramachaneni <[email protected]>
  • Loading branch information
gouthamve authored Mar 30, 2020
1 parent 500d41c commit f0f0bbe
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 5 deletions.
93 changes: 91 additions & 2 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
}
}

func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) {
for _, previousImage := range previousVersionImages {
t.Run(fmt.Sprintf("Backward compatibility upgrading from %s", previousImage), func(t *testing.T) {
runNewDistributorsCanPushToOldIngestersWithReplication(t, previousImage)
})
}
}

func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage string) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -92,20 +100,101 @@ func runBackwardCompatibilityTestWithChunksStorage(t *testing.T, previousImage s
// stopped, which means the transfer to ingester-2 is completed.
require.NoError(t, s.Stop(ingester1))

checkQueries(t, consul, distributor,
expectedVector,
previousImage,
flagsForOldImage, ChunksStorageFlags,
now,
s,
1,
)
}

// Check for issues like https://github.com/cortexproject/cortex/issues/2356
func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previousImage string) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
dynamo := e2edb.NewDynamoDB()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(dynamo, consul))

flagsForOldImage := mergeFlags(ChunksStorageFlags, map[string]string{
"-schema-config-file": "",
"-config-yaml": ChunksStorageFlags["-schema-config-file"],
"-distributor.replication-factor": "3",
})

flagsForNewImage := mergeFlags(ChunksStorageFlags, map[string]string{
"-distributor.replication-factor": "3",
})

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

// Start Cortex table-manager (running on current version since the backward compatibility
// test is about testing a rolling update of other services).
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
// sure the tables have been created.
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))

// Start other Cortex components (ingester running on previous version).
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flagsForNewImage, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))

// Wait until the distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(1536), "cortex_ring_tokens_total"))

// Push some series to Cortex.
now := time.Now()
series, expectedVector := generateSeries("series_1", now)

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

checkQueries(t, consul, distributor,
expectedVector,
previousImage,
flagsForOldImage, flagsForNewImage,
now,
s,
3,
)
}

func checkQueries(t *testing.T, consul *e2e.HTTPService, distributor *e2ecortex.CortexService,
expectedVector model.Vector,
previousImage string,
flagsForOldImage, flagsForNewImage map[string]string,
now time.Time,
s *e2e.Scenario,
numIngesters int,
) {
// Query the new ingester both with the old and the new querier.
for _, image := range []string{previousImage, ""} {
var querier *e2ecortex.CortexService

if image == previousImage {
querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flagsForOldImage, image)
} else {
querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, image)
querier = e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flagsForNewImage, image)
}

require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the querier has updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(numIngesters*512)), "cortex_ring_tokens_total"))

// Query the series
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
Expand Down
8 changes: 5 additions & 3 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,13 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
if _, ok := distinctHosts[token.Ingester]; ok {
continue
}
if _, ok := distinctZones[token.Zone]; ok {
continue
if token.Zone != "" { // Ignore if the ingesters don't have a zone set.
if _, ok := distinctZones[token.Zone]; ok {
continue
}
distinctZones[token.Zone] = struct{}{}
}
distinctHosts[token.Ingester] = struct{}{}
distinctZones[token.Zone] = struct{}{}
ingester := r.ringDesc.Ingesters[token.Ingester]

// We do not want to Write to Ingesters that are not ACTIVE, but we do want
Expand Down

0 comments on commit f0f0bbe

Please sign in to comment.