Skip to content

Commit

Permalink
blockbuilder: test out-of-order samples and records
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo committed Sep 17, 2024
1 parent 4ff82ef commit 748286a
Showing 1 changed file with 123 additions and 3 deletions.
126 changes: 123 additions & 3 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,121 @@ func TestBlockBuilder_WithMultipleTenants(t *testing.T) {
}
}

func TestBlockBuilder_WithOutOfOrderRecordsAndSamples(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

_, kafkaAddr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, testTopic)

kafkaClient := mustKafkaClient(t, kafkaAddr)
kafkaClient.AddConsumeTopics(testTopic)

cfg, overrides := blockBuilderConfig(t, kafkaAddr)

bb, err := New(cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), overrides)
require.NoError(t, err)

// We don't want to run the service here. Instead, the test cases below trigger and assert the consumption cycles explicitly.
require.NoError(t, bb.starting(ctx))
t.Cleanup(func() {
require.NoError(t, bb.stopping(nil))
})

const tenantID = "1"

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID)

cycleEndStartup := cycleEndAtStartup(time.Now(), cfg.ConsumeInterval, cfg.ConsumeIntervalBuffer)
cycleEnd := cycleEndStartup

var allSamples []mimirpb.Sample

// Out of order sample w.r.t. samples in last cycle. But for this cycle,
// the TSDB starts fresh. So in terms of the actual block building, it will be
// taken as in-order. Depending on if the out-of-order and in-order sample below
// crosses the 2h block boundary, we expect either 1 or 2 blocks.
{
kafkaRecTime := cycleEnd
outOfOrderSampleTime := kafkaRecTime.Add(-time.Hour)
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, outOfOrderSampleTime)
allSamples = append(allSamples, samples...)
}

// In-order sample w.r.t. last consume cycle.
{
kafkaRecTime := cycleEnd.Add(cfg.ConsumeInterval / 10)
inOrderSampleTime := kafkaRecTime.Add(-time.Minute)
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, inOrderSampleTime)
allSamples = append(allSamples, samples...)
}

// Advance the tracking time to the next cycle.
cycleEnd = cycleEnd.Add(cfg.ConsumeInterval)

// Sample is not a part of the next cycle (a future sample) but its kafka record falls in this cycle.
// This sample should not go into the first cycle.
{
kafkaRecTime := cycleEnd.Add(cfg.ConsumeInterval / 10)
sampleTime := cycleEnd.Add(cfg.ConsumeInterval)
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, sampleTime)
allSamples = append(allSamples, samples...)
}

// In-order sample falls within the next cycle, but the kafka record time does not fall within the next consume cycle.
// This sample should not go into the first cycle.
{
kafkaRecTime := cycleEnd.Add(time.Minute)
sampleTime := cycleEndStartup.Add(cfg.ConsumeInterval / 10).Add(time.Minute)
samples := produceSamples(ctx, t, kafkaClient, kafkaRecTime, tenantID, sampleTime)
allSamples = append(allSamples, samples...)
}

t.Run("consume only out of order and in-order", func(t *testing.T) {
require.NoError(t, bb.nextConsumeCycle(ctx, cycleEnd))

compareQuery(t,
mustTSDBOpen(t, bucketDir),
allSamples[:len(allSamples)-2], // Don't expect the last two sample.
nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
})

t.Run("future record", func(t *testing.T) {
// The sample from above which was in-order but the kafka record was in future
// should get consumed in this cycle. The other sample that is still in the future should not be consumed.
cycleEnd = cycleEnd.Add(cfg.ConsumeInterval)
require.NoError(t, bb.nextConsumeCycle(ctx, cycleEnd))

// The second to last sample in allSamples is the one that is still in the future. We don't expect it here.
expSamples := append([]mimirpb.Sample(nil), allSamples[:len(allSamples)-2]...)
expSamples = append(expSamples, allSamples[len(allSamples)-1])
compareQuery(t,
mustTSDBOpen(t, bucketDir),
expSamples,
nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
})

t.Run("future sample", func(t *testing.T) {
// The future sample gets consumed here. This also tests the case
// where even though the kafka record after this was processed completely,
// we still go back to the last kafka commit and consume the sample that happened to be in the future.
cycleEnd = cycleEnd.Add(cfg.ConsumeInterval)
require.NoError(t, bb.nextConsumeCycle(ctx, cycleEnd))

compareQuery(t,
mustTSDBOpen(t, bucketDir),
allSamples,
nil,
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"),
)
})
}

func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -369,9 +484,7 @@ func TestBlockBuilder_WithNonMonotonicRecordTimestamps(t *testing.T) {
require.NoError(t, bb.nextConsumeCycle(ctx, end))

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenantID)
db, err := tsdb.Open(bucketDir, log.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
db := mustTSDBOpen(t, bucketDir)

compareQuery(t,
db,
Expand Down Expand Up @@ -533,3 +646,10 @@ func mustTimeParse(t *testing.T, layout, v string) time.Time {
require.False(t, ts.IsZero())
return ts
}

func mustTSDBOpen(t *testing.T, bucketDir string) *tsdb.DB {
db, err := tsdb.Open(bucketDir, log.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
return db
}

0 comments on commit 748286a

Please sign in to comment.