diff --git a/.mdox.yaml b/.mdox.yaml index 4f8a0be006c..1c9c4dd589c 100644 --- a/.mdox.yaml +++ b/.mdox.yaml @@ -72,6 +72,10 @@ transformations: backMatter: *docBackMatter # Non-versioned element: Blog. + + - glob: "support/*" + path: /../support/* + - glob: "blog/*" path: /../blog/* diff --git a/CHANGELOG.md b/CHANGELOG.md index df6e93e4cda..64b849f9b17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs. - [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule. - [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Apply series limit when streaming series for series actually matched if lazy postings is enabled. +- [#6984](https://github.com/thanos-io/thanos/pull/6984) Store Gateway: Added `--store.index-header-lazy-download-strategy` to specify how to lazily download index headers when lazy mmap is enabled. ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9191b77ce8e..7d80687ec30 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -28,6 +28,7 @@ import ( blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" hidden "github.com/thanos-io/thanos/pkg/extflag" @@ -89,6 +90,8 @@ type storeConfig struct { lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration lazyExpandedPostingsEnabled bool + + indexHeaderLazyDownloadStrategy string } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -186,6 +189,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("store.index-header-lazy-download-strategy", "Strategy of how to download index headers lazily. Supported values: eager, lazy. If eager, always download index header during initial load. If lazy, download index header during query time."). + Default(string(indexheader.EagerDownloadStrategy)). + EnumVar(&sc.indexHeaderLazyDownloadStrategy, string(indexheader.EagerDownloadStrategy), string(indexheader.LazyDownloadStrategy)) + cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -388,6 +395,9 @@ func runStore( return conf.estimatedMaxChunkSize }), store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), + store.WithIndexHeaderLazyDownloadStrategy( + indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(), + ), } if conf.debugLogging { diff --git a/docs/components/store.md b/docs/components/store.md index 3ba59a2d9e0..85fd4ce6886 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -193,6 +193,12 @@ Flags: DEPRECATED: use store.limits.request-samples. --store.grpc.touched-series-limit=0 DEPRECATED: use store.limits.request-series. + --store.index-header-lazy-download-strategy=eager + Strategy of how to download index headers + lazily. Supported values: eager, lazy. + If eager, always download index header during + initial load. If lazy, download index header + during query time. --store.limits.request-samples=0 The maximum samples allowed for a single Series request, The Series call fails if diff --git a/docs/getting-started.md b/docs/getting-started.md index 9e0a7a8ed05..da6b414472b 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -88,6 +88,10 @@ See up to date [jsonnet mixins](https://github.com/thanos-io/thanos/tree/main/mi ## Talks +* 2023 + * [Planetscale monitoring: Handling billions of active series with Prometheus and Thanos](https://www.youtube.com/watch?v=Or8r46fSaOg) + * [Taming the Tsunami: low latency ingestion of push-based metrics in Prometheus](https://www.youtube.com/watch?v=W81x1j765hc) + * 2022 * [Story of Correlation: Integrating Thanos Metrics with Observability Signals](https://www.youtube.com/watch?v=rWFb01GW0mQ) * [Running the Observability As a Service For Your Teams With Thanos](https://www.youtube.com/watch?v=I4Mfyfd_4M8) diff --git a/docs/support/welcome.md b/docs/support/welcome.md new file mode 100644 index 00000000000..c4e48c55823 --- /dev/null +++ b/docs/support/welcome.md @@ -0,0 +1,6 @@ +--- +title: Welcome to Support and Training! +author: Thanos Team +--- + +Anyone who has developed a Thanos training program or offers related services can add themselves to this page by opening a pull request against it. diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 56dabc33f7c..4130157a96a 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -206,7 +206,7 @@ func TestReaders(t *testing.T) { _, err := WriteBinary(ctx, bkt, id, fn) testutil.Ok(t, err) - br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil) + br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil, false) testutil.Ok(t, err) defer func() { testutil.Ok(t, br.Close()) }() diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index d7e589c724f..2b36bf80259 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -83,6 +83,9 @@ type LazyBinaryReader struct { // Keep track of the last time it was used. usedAt *atomic.Int64 + + // If true, index header will be downloaded at query time rather than initialization time. + lazyDownload bool } // NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist @@ -99,8 +102,9 @@ func NewLazyBinaryReader( metrics *LazyBinaryReaderMetrics, binaryReaderMetrics *BinaryReaderMetrics, onClosed func(*LazyBinaryReader), + lazyDownload bool, ) (*LazyBinaryReader, error) { - if dir != "" { + if dir != "" && !lazyDownload { indexHeaderFile := filepath.Join(dir, id.String(), block.IndexHeaderFilename) // If the index-header doesn't exist we should download it. if _, err := os.Stat(indexHeaderFile); err != nil { @@ -131,6 +135,7 @@ func NewLazyBinaryReader( binaryReaderMetrics: binaryReaderMetrics, usedAt: atomic.NewInt64(time.Now().UnixNano()), onClosed: onClosed, + lazyDownload: lazyDownload, }, nil } diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 150f2d649b3..d740da99abd 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -5,6 +5,7 @@ package indexheader import ( "context" + "fmt" "os" "path/filepath" "sync" @@ -31,11 +32,11 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() - _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil) + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil, false) testutil.NotOk(t, err) } -func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { +func TestNewLazyBinaryReader_ShouldNotFailIfUnableToBuildIndexHeaderWhenLazyDownload(t *testing.T) { ctx := context.Background() tmpDir := t.TempDir() @@ -43,36 +44,61 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() - - // Create block. - blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ - {{Name: "a", Value: "1"}}, - {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil, true) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) +} - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) +func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { + ctx := context.Background() - // Should lazy load the index upon first usage. - v, err := r.IndexVersion() - testutil.Ok(t, err) - testutil.Equals(t, 2, v) - testutil.Assert(t, r.reader != nil) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + tmpDir := t.TempDir() - labelNames, err := r.LabelNames() + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + defer func() { testutil.Ok(t, bkt.Close()) }() + + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + _, err = os.Stat(filepath.Join(r.dir, blockID.String(), block.IndexHeaderFilename)) + // Index file shouldn't exist. + if lazyDownload { + testutil.Equals(t, true, os.IsNotExist(err)) + } + // Should lazy load the index upon first usage. + v, err := r.IndexVersion() + testutil.Ok(t, err) + if lazyDownload { + _, err = os.Stat(filepath.Join(r.dir, blockID.String(), block.IndexHeaderFilename)) + testutil.Ok(t, err) + } + testutil.Equals(t, 2, v) + testutil.Assert(t, r.reader != nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + }) + } } func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { @@ -96,22 +122,26 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { headerFilename := filepath.Join(tmpDir, blockID.String(), block.IndexHeaderFilename) testutil.Ok(t, os.WriteFile(headerFilename, []byte("xxx"), os.ModePerm)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - - // Ensure it can read data. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + }) + } } func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { @@ -131,37 +161,41 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - - // Should lazy load the index upon first usage. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - - // Close it. - testutil.Ok(t, r.Close()) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Should lazy load again upon next usage. - labelNames, err = r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - - // Closing an already closed lazy reader should be a no-op. - for i := 0; i < 2; i++ { - testutil.Ok(t, r.Close()) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Close it. + testutil.Ok(t, r.Close()) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Should lazy load again upon next usage. + labelNames, err = r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Closing an already closed lazy reader should be a no-op. + for i := 0; i < 2; i++ { + testutil.Ok(t, r.Close()) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + } + }) } } @@ -182,34 +216,38 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - - // Should lazy load the index upon first usage. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Try to unload but not idle since enough time. - testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Try to unload and idle since enough time. - testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Try to unload but not idle since enough time. + testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Try to unload and idle since enough time. + testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + }) + } } func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { @@ -232,49 +270,53 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - t.Cleanup(func() { - testutil.Ok(t, r.Close()) - }) - - done := make(chan struct{}) - time.AfterFunc(runDuration, func() { close(done) }) - wg := sync.WaitGroup{} - wg.Add(2) - - // Start a goroutine which continuously try to unload the reader. - go func() { - defer wg.Done() - - for { - select { - case <-done: - return - default: - testutil.Ok(t, r.unloadIfIdleSince(0)) - } - } - }() - - // Try to read multiple times, while the other goroutine continuously try to unload it. - go func() { - defer wg.Done() - - for { - select { - case <-done: - return - default: - _, err := r.PostingsOffset("a", "1") - testutil.Assert(t, err == nil || err == errUnloadedWhileLoading) - } - } - }() - - // Wait until both goroutines have done. - wg.Wait() + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + t.Cleanup(func() { + testutil.Ok(t, r.Close()) + }) + + done := make(chan struct{}) + time.AfterFunc(runDuration, func() { close(done) }) + wg := sync.WaitGroup{} + wg.Add(2) + + // Start a goroutine which continuously try to unload the reader. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + testutil.Ok(t, r.unloadIfIdleSince(0)) + } + } + }() + + // Try to read multiple times, while the other goroutine continuously try to unload it. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + _, err := r.PostingsOffset("a", "1") + testutil.Assert(t, err == nil || err == errUnloadedWhileLoading) + } + } + }() + + // Wait until both goroutines have done. + wg.Wait() + }) + } } diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index fc8cb268139..e9fe5eb7dca 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -14,6 +14,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + + "github.com/thanos-io/thanos/pkg/block/metadata" ) // ReaderPoolMetrics holds metrics tracked by ReaderPool. @@ -46,10 +48,47 @@ type ReaderPool struct { // Keep track of all readers managed by the pool. lazyReadersMx sync.Mutex lazyReaders map[*LazyBinaryReader]struct{} + + lazyDownloadFunc LazyDownloadIndexHeaderFunc +} + +// IndexHeaderLazyDownloadStrategy specifies how to download index headers +// lazily. Only used when lazy mmap is enabled. +type IndexHeaderLazyDownloadStrategy string + +const ( + // EagerDownloadStrategy always disables lazy downloading index headers. + EagerDownloadStrategy IndexHeaderLazyDownloadStrategy = "eager" + // LazyDownloadStrategy always lazily download index headers. + LazyDownloadStrategy IndexHeaderLazyDownloadStrategy = "lazy" +) + +func (s IndexHeaderLazyDownloadStrategy) StrategyToDownloadFunc() LazyDownloadIndexHeaderFunc { + switch s { + case LazyDownloadStrategy: + return AlwaysLazyDownloadIndexHeader + default: + // Always fallback to eager download index header. + return AlwaysEagerDownloadIndexHeader + } +} + +// LazyDownloadIndexHeaderFunc is used to determinte whether to download the index header lazily +// or not by checking its block metadata. Usecase can be by time or by index file size. +type LazyDownloadIndexHeaderFunc func(meta *metadata.Meta) bool + +// AlwaysEagerDownloadIndexHeader always eagerly download index header. +func AlwaysEagerDownloadIndexHeader(meta *metadata.Meta) bool { + return false +} + +// AlwaysLazyDownloadIndexHeader always lazily download index header. +func AlwaysLazyDownloadIndexHeader(meta *metadata.Meta) bool { + return true } // NewReaderPool makes a new ReaderPool. -func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics) *ReaderPool { +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics, lazyDownloadFunc LazyDownloadIndexHeaderFunc) *ReaderPool { p := &ReaderPool{ logger: logger, metrics: metrics, @@ -57,6 +96,7 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime lazyReaderIdleTimeout: lazyReaderIdleTimeout, lazyReaders: make(map[*LazyBinaryReader]struct{}), close: make(chan struct{}), + lazyDownloadFunc: lazyDownloadFunc, } // Start a goroutine to close idle readers (only if required). @@ -81,12 +121,12 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime // NewBinaryReader creates and returns a new binary reader. If the pool has been configured // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. -func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (Reader, error) { +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, meta *metadata.Meta) (Reader, error) { var reader Reader var err error if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed) + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta)) } else { reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader) } diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index 4ed60ea8fb4..a7445f0fed2 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -54,12 +54,15 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + meta, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(t, err) + for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil)) + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil), AlwaysEagerDownloadIndexHeader) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) testutil.Ok(t, err) defer func() { testutil.Ok(t, r.Close()) }() @@ -89,12 +92,14 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + meta, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(t, err) metrics := NewReaderPoolMetrics(nil) - pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics) + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics, AlwaysEagerDownloadIndexHeader) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) testutil.Ok(t, err) defer func() { testutil.Ok(t, r.Close()) }() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 3ebd6f06a47..fd4fb7392c4 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -413,6 +413,8 @@ type BucketStore struct { blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator + + indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc } func (s *BucketStore) validate() error { @@ -531,6 +533,14 @@ func WithDontResort(true bool) BucketStoreOption { } } +// WithIndexHeaderLazyDownloadStrategy specifies what block to lazy download its index header. +// Only used when lazy mmap is enabled at the same time. +func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption { + return func(s *BucketStore) { + s.indexHeaderLazyDownloadStrategy = strategy + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -559,21 +569,22 @@ func NewBucketStore( b := make([]byte, 0, initialBufSize) return &b }}, - chunkPool: pool.NoopBytes{}, - blocks: map[ulid.ULID]*bucketBlock{}, - blockSets: map[uint64]*bucketBlockSet{}, - blockSyncConcurrency: blockSyncConcurrency, - queryGate: gate.NewNoop(), - chunksLimiterFactory: chunksLimiterFactory, - seriesLimiterFactory: seriesLimiterFactory, - bytesLimiterFactory: bytesLimiterFactory, - partitioner: partitioner, - enableCompatibilityLabel: enableCompatibilityLabel, - postingOffsetsInMemSampling: postingOffsetsInMemSampling, - enableSeriesResponseHints: enableSeriesResponseHints, - enableChunkHashCalculation: enableChunkHashCalculation, - seriesBatchSize: SeriesBatchSize, - sortingStrategy: sortingStrategyStore, + chunkPool: pool.NoopBytes{}, + blocks: map[ulid.ULID]*bucketBlock{}, + blockSets: map[uint64]*bucketBlockSet{}, + blockSyncConcurrency: blockSyncConcurrency, + queryGate: gate.NewNoop(), + chunksLimiterFactory: chunksLimiterFactory, + seriesLimiterFactory: seriesLimiterFactory, + bytesLimiterFactory: bytesLimiterFactory, + partitioner: partitioner, + enableCompatibilityLabel: enableCompatibilityLabel, + postingOffsetsInMemSampling: postingOffsetsInMemSampling, + enableSeriesResponseHints: enableSeriesResponseHints, + enableChunkHashCalculation: enableChunkHashCalculation, + seriesBatchSize: SeriesBatchSize, + sortingStrategy: sortingStrategyStore, + indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, } for _, option := range options { @@ -582,7 +593,7 @@ func NewBucketStore( // Depend on the options indexReaderPoolMetrics := indexheader.NewReaderPoolMetrics(extprom.WrapRegistererWithPrefix("thanos_bucket_store_", s.reg)) - s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics, s.indexHeaderLazyDownloadStrategy) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too if err := s.validate(); err != nil { @@ -759,6 +770,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er s.dir, meta.ULID, s.postingOffsetsInMemSampling, + meta, ) if err != nil { return errors.Wrap(err, "create index header reader") diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 87659f54503..67223a9467f 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1658,7 +1658,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { bkt: objstore.WithNoopInstr(bkt), logger: logger, indexCache: indexCache, - indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil)), + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.AlwaysEagerDownloadIndexHeader), metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{{b1, b2}}}, diff --git a/pkg/ui/react-app/src/pages/graph/Panel.tsx b/pkg/ui/react-app/src/pages/graph/Panel.tsx index 6f4e31c0fe8..915d5d18138 100644 --- a/pkg/ui/react-app/src/pages/graph/Panel.tsx +++ b/pkg/ui/react-app/src/pages/graph/Panel.tsx @@ -240,6 +240,7 @@ class Panel extends Component { method: 'GET', headers: { 'Content-Type': 'application/json', + 'X-Thanos-Force-Tracing': 'true', // Conditionally add the header if the checkbox is enabled ...(this.props.options.forceTracing ? { 'X-Thanos-Force-Tracing': 'true' } : {}), }, @@ -247,8 +248,15 @@ class Panel extends Component { credentials: 'same-origin', signal: abortController.signal, }) - .then((resp) => resp.json()) - .then((json) => { + .then((resp) => { + return resp.json().then((json) => { + return { + json, + headers: resp.headers, + }; + }); + }) + .then(({ json, headers }) => { if (json.status !== 'success') { throw new Error(json.error || 'invalid response JSON'); } @@ -263,7 +271,7 @@ class Panel extends Component { } analysis = json.data.analysis; } - + const traceID = headers.get('X-Thanos-Trace-ID'); this.setState({ error: null, data: json.data, @@ -271,12 +279,14 @@ class Panel extends Component { startTime, endTime, resolution, + traceID: traceID ? traceID : '', }, warnings: json.warnings, stats: { loadTime: Date.now() - queryStart, resolution, resultSeries, + traceID, }, loading: false, analysis: analysis, diff --git a/pkg/ui/react-app/src/pages/graph/QueryStatsView.test.tsx b/pkg/ui/react-app/src/pages/graph/QueryStatsView.test.tsx index e04c914e1a3..4df5fc37c36 100755 --- a/pkg/ui/react-app/src/pages/graph/QueryStatsView.test.tsx +++ b/pkg/ui/react-app/src/pages/graph/QueryStatsView.test.tsx @@ -1,5 +1,5 @@ import * as React from 'react'; -import { shallow } from 'enzyme'; +import { mount } from 'enzyme'; import QueryStatsView from './QueryStatsView'; describe('QueryStatsView', () => { @@ -8,10 +8,13 @@ describe('QueryStatsView', () => { loadTime: 100, resolution: 5, resultSeries: 10000, + traceID: 'e575f9d4eab63a90cdc3dc4ef1b8dda0', }; - const queryStatsView = shallow(); - expect(queryStatsView.prop('className')).toEqual('query-stats'); - expect(queryStatsView.children().prop('className')).toEqual('float-right'); - expect(queryStatsView.children().text()).toEqual('Load time: 100ms   Resolution: 5s   Result series: 10000'); + const queryStatsView = mount(); + expect(queryStatsView.find('.query-stats').prop('className')).toEqual('query-stats'); + expect(queryStatsView.find('.float-right').prop('className')).toEqual('float-right'); + expect(queryStatsView.find('.float-right').html()).toEqual( + `Load time: ${queryStatsProps.loadTime}ms   Resolution: ${queryStatsProps.resolution}s   Result series: ${queryStatsProps.resultSeries}   Trace ID: ${queryStatsProps.traceID}` + ); }); }); diff --git a/pkg/ui/react-app/src/pages/graph/QueryStatsView.tsx b/pkg/ui/react-app/src/pages/graph/QueryStatsView.tsx index 8bfd91c74bd..d3e9a7a5c43 100644 --- a/pkg/ui/react-app/src/pages/graph/QueryStatsView.tsx +++ b/pkg/ui/react-app/src/pages/graph/QueryStatsView.tsx @@ -4,16 +4,17 @@ export interface QueryStats { loadTime: number; resolution: number; resultSeries: number; + traceID: string | null; } const QueryStatsView: FC = (props) => { - const { loadTime, resolution, resultSeries } = props; + const { loadTime, resolution, resultSeries, traceID } = props; + const prev = `Load time: ${loadTime}ms   Resolution: ${resolution}s   Result series: ${resultSeries}`; + const str = traceID ? prev + `   Trace ID: ${traceID}` : prev; return (
- - Load time: {loadTime}ms   Resolution: {resolution}s   Result series: {resultSeries} - +
); }; diff --git a/pkg/ui/react-app/src/types/types.ts b/pkg/ui/react-app/src/types/types.ts index ca31bc6cc44..08054fa1ac7 100644 --- a/pkg/ui/react-app/src/types/types.ts +++ b/pkg/ui/react-app/src/types/types.ts @@ -17,6 +17,7 @@ export interface QueryParams { startTime: number; endTime: number; resolution: number; + traceID: string; } export type Rule = { diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index f93219cae6f..5fda32c0f7b 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/prompb" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/cacheutil" @@ -41,12 +42,13 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - now := time.Now() + // Predefined Timestamp + predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC) - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "") - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) - q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init() + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) inMemoryCacheConfig := queryfrontend.CacheProviderConfig{ @@ -64,17 +66,34 @@ func TestQueryFrontend(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) + // Writing a custom Timeseries into the receiver + testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "up"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "test"}, + {Name: "replica", Value: "0"}, + }, + Samples: []prompb.Sample{ + {Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)}, + }}}, + i.Endpoint("remote-write"), + )) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) // Ensure we can get the result from Querier first so that it // doesn't need to retry when we send queries to the frontend later. - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -86,13 +105,15 @@ func TestQueryFrontend(t *testing.T) { queryTimes := vals[0] t.Run("query frontend works for instant query", func(t *testing.T) { - queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -115,8 +136,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -159,8 +180,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -181,7 +202,7 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_cache_hits_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_gets_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total")) @@ -192,9 +213,8 @@ func TestQueryFrontend(t *testing.T) { e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "query_range"))), ) - // One more request is needed in order to satisfy the req range. testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2emon.Equals(2), + e2emon.Equals(1), []string{"http_requests_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))), ) @@ -206,8 +226,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(24*time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(24*time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -225,13 +245,13 @@ func TestQueryFrontend(t *testing.T) { []string{"thanos_query_frontend_queries_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "op", "query_range"))), ) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "cortex_cache_fetched_keys_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_hits_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_new_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_added_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_gets_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_entries")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_gets_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_misses_total")) // Query is 25h so it will be split to 2 requests. testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( @@ -240,7 +260,7 @@ func TestQueryFrontend(t *testing.T) { ) testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2emon.Equals(4), + e2emon.Equals(3), []string{"http_requests_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))), ) @@ -248,7 +268,7 @@ func TestQueryFrontend(t *testing.T) { t.Run("query frontend splitting works for labels names API", func(t *testing.T) { // LabelNames and LabelValues API should still work via query frontend. - labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(predefTimestamp.Add(-time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) > 0 }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -267,7 +287,7 @@ func TestQueryFrontend(t *testing.T) { e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "labels"))), ) - labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) > 0 }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -288,7 +308,7 @@ func TestQueryFrontend(t *testing.T) { }) t.Run("query frontend splitting works for labels values API", func(t *testing.T) { - labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(predefTimestamp.Add(-time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -307,7 +327,7 @@ func TestQueryFrontend(t *testing.T) { e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "labels"))), ) - labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -333,8 +353,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []map[string]string) bool { if len(res) != 1 { return false @@ -345,6 +365,8 @@ func TestQueryFrontend(t *testing.T) { "instance": "localhost:9090", "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", + "tenant_id": "default-tenant", }) }, ) @@ -369,8 +391,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, - timestamp.FromTime(now.Add(-24*time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []map[string]string) bool { if len(res) != 1 { return false @@ -381,6 +403,8 @@ func TestQueryFrontend(t *testing.T) { "instance": "localhost:9090", "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", + "tenant_id": "default-tenant", }) }, ) @@ -409,12 +433,13 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - now := time.Now() + // Predefined timestamp + predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC) - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "") - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) - q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init() + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) memcached := e2ethanos.NewMemcached(e, "1") @@ -443,19 +468,34 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) + testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "up"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "test"}, + {Name: "replica", Value: "0"}, + }, + Samples: []prompb.Sample{ + {Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)}, + }}}, + i.Endpoint("remote-write"))) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_memcache_client_servers")) // Ensure we can get the result from Querier first so that it // doesn't need to retry when we send queries to the frontend later. - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -469,8 +509,8 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -501,8 +541,8 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 6584c7b8426..5b9a120b902 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -4,6 +4,7 @@ package e2e_test import ( + "bytes" "context" "fmt" "io" @@ -1720,6 +1721,39 @@ func rangeQuery(t *testing.T, ctx context.Context, addr string, q func() string, return retExplanation } +// Performs a remote write at the receiver external endpoint. +func remoteWrite(ctx context.Context, timeseries []prompb.TimeSeries, addr string) error { + // Create write request + data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries}) + if err != nil { + return err + } + + // Create HTTP request + compressed := snappy.Encode(nil, data) + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/receive", addr), bytes.NewReader(compressed)) + if err != nil { + return err + } + + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + // Execute HTTP request + res, err := promclient.NewDefaultClient().HTTPClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer runutil.ExhaustCloseWithErrCapture(&err, res.Body, "%s: close body", req.URL.String()) + + if res.StatusCode/100 != 2 { + return errors.Errorf("request failed with code %s", res.Status) + } + + return nil +} + func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, end int64, check func(data []*exemplarspb.ExemplarData) error) { t.Helper() diff --git a/website/.hugo_build.lock b/website/.hugo_build.lock new file mode 100644 index 00000000000..e69de29bb2d diff --git a/website/data/adopters.yml b/website/data/adopters.yml index 5b2ad40aebc..1ae6e5b2645 100644 --- a/website/data/adopters.yml +++ b/website/data/adopters.yml @@ -69,9 +69,9 @@ adopters: - name: Amadeus url: https://amadeus.com logo: amadeus.png -- name: Grofers - url: https://grofers.com - logo: grofers.png +- name: Blinkit + url: https://blinkit.com + logo: blinkit.png - name: Tencent url: https://github.com/tkestack/tke logo: tencent.png @@ -236,4 +236,4 @@ adopters: logo: grupo-olx.png - name: TrueLayer url: https://truelayer.com/ - logo: truelayer.png + logo: truelayer.png \ No newline at end of file diff --git a/website/layouts/_default/baseof.html b/website/layouts/_default/baseof.html index 4fd3803768a..83b539295be 100644 --- a/website/layouts/_default/baseof.html +++ b/website/layouts/_default/baseof.html @@ -46,6 +46,9 @@ + diff --git a/website/layouts/partials/versioning/version-picker.html b/website/layouts/partials/versioning/version-picker.html index 732afd453d7..f2f4fa9c1e6 100644 --- a/website/layouts/partials/versioning/version-picker.html +++ b/website/layouts/partials/versioning/version-picker.html @@ -4,6 +4,7 @@ {{- range .Site.Sections.Reverse }} {{- $version := .Section }} {{- if eq $version "blog" }}{{continue}}{{end}} + {{- if eq $version "support" }}{{continue}}{{end}} {{ $version }} diff --git a/website/layouts/support/list.html b/website/layouts/support/list.html new file mode 100644 index 00000000000..84057b7ef66 --- /dev/null +++ b/website/layouts/support/list.html @@ -0,0 +1,28 @@ +{{ define "main" }} +
+
+
+ {{ range .Paginator.Pages }} +
+

Support and Training

+

Firms that offer consultancy and enterprise support.

+ +
+ {{ .Summary }} +
+ + {{ end}} + {{ template "_internal/pagination.html" . }} +
+
+
+ {{ end }} \ No newline at end of file diff --git a/website/layouts/support/single.html b/website/layouts/support/single.html new file mode 100644 index 00000000000..f0a955227ea --- /dev/null +++ b/website/layouts/support/single.html @@ -0,0 +1,14 @@ +{{ define "main" }} +
+
+
+
+

{{ .Title }}

+

+
+ {{ .Content }} +
+
+
+
+ {{ end }} \ No newline at end of file diff --git a/website/static/cloudraft.png b/website/static/cloudraft.png new file mode 100644 index 00000000000..c76ca7987f5 Binary files /dev/null and b/website/static/cloudraft.png differ diff --git a/website/static/logos/blinkit.png b/website/static/logos/blinkit.png new file mode 100644 index 00000000000..22e2add1af6 Binary files /dev/null and b/website/static/logos/blinkit.png differ diff --git a/website/static/logos/grofers.png b/website/static/logos/grofers.png deleted file mode 100644 index f73d67cd1b6..00000000000 Binary files a/website/static/logos/grofers.png and /dev/null differ diff --git a/website/static/o11y.svg b/website/static/o11y.svg new file mode 100644 index 00000000000..2163d8cf1bd --- /dev/null +++ b/website/static/o11y.svg @@ -0,0 +1,37 @@ + + + + + + + + + + + +