From bd7accb97db351f64c8304dbca0fb1a4c0811a3e Mon Sep 17 00:00:00 2001 From: Vanshika <102902652+Vanshikav123@users.noreply.github.com> Date: Fri, 22 Dec 2023 18:47:44 +0530 Subject: [PATCH 1/5] TraceID : Fetching TraceID (#6973) --- pkg/ui/react-app/src/pages/graph/Panel.tsx | 16 +++++++++++++--- .../src/pages/graph/QueryStatsView.test.tsx | 13 ++++++++----- .../react-app/src/pages/graph/QueryStatsView.tsx | 9 +++++---- pkg/ui/react-app/src/types/types.ts | 1 + 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pkg/ui/react-app/src/pages/graph/Panel.tsx b/pkg/ui/react-app/src/pages/graph/Panel.tsx index 354831d548a..2ba02d11312 100644 --- a/pkg/ui/react-app/src/pages/graph/Panel.tsx +++ b/pkg/ui/react-app/src/pages/graph/Panel.tsx @@ -231,6 +231,7 @@ class Panel extends Component { method: 'GET', headers: { 'Content-Type': 'application/json', + 'X-Thanos-Force-Tracing': 'true', // Conditionally add the header if the checkbox is enabled ...(this.props.options.forceTracing ? { 'X-Thanos-Force-Tracing': 'true' } : {}), }, @@ -238,8 +239,15 @@ class Panel extends Component { credentials: 'same-origin', signal: abortController.signal, }) - .then((resp) => resp.json()) - .then((json) => { + .then((resp) => { + return resp.json().then((json) => { + return { + json, + headers: resp.headers, + }; + }); + }) + .then(({ json, headers }) => { if (json.status !== 'success') { throw new Error(json.error || 'invalid response JSON'); } @@ -254,7 +262,7 @@ class Panel extends Component { } analysis = json.data.analysis; } - + const traceID = headers.get('X-Thanos-Trace-ID'); this.setState({ error: null, data: json.data, @@ -262,12 +270,14 @@ class Panel extends Component { startTime, endTime, resolution, + traceID: traceID ? traceID : '', }, warnings: json.warnings, stats: { loadTime: Date.now() - queryStart, resolution, resultSeries, + traceID, }, loading: false, analysis: analysis, diff --git a/pkg/ui/react-app/src/pages/graph/QueryStatsView.test.tsx b/pkg/ui/react-app/src/pages/graph/QueryStatsView.test.tsx index e04c914e1a3..4df5fc37c36 100755 --- a/pkg/ui/react-app/src/pages/graph/QueryStatsView.test.tsx +++ b/pkg/ui/react-app/src/pages/graph/QueryStatsView.test.tsx @@ -1,5 +1,5 @@ import * as React from 'react'; -import { shallow } from 'enzyme'; +import { mount } from 'enzyme'; import QueryStatsView from './QueryStatsView'; describe('QueryStatsView', () => { @@ -8,10 +8,13 @@ describe('QueryStatsView', () => { loadTime: 100, resolution: 5, resultSeries: 10000, + traceID: 'e575f9d4eab63a90cdc3dc4ef1b8dda0', }; - const queryStatsView = shallow(); - expect(queryStatsView.prop('className')).toEqual('query-stats'); - expect(queryStatsView.children().prop('className')).toEqual('float-right'); - expect(queryStatsView.children().text()).toEqual('Load time: 100ms   Resolution: 5s   Result series: 10000'); + const queryStatsView = mount(); + expect(queryStatsView.find('.query-stats').prop('className')).toEqual('query-stats'); + expect(queryStatsView.find('.float-right').prop('className')).toEqual('float-right'); + expect(queryStatsView.find('.float-right').html()).toEqual( + `Load time: ${queryStatsProps.loadTime}ms   Resolution: ${queryStatsProps.resolution}s   Result series: ${queryStatsProps.resultSeries}   Trace ID: ${queryStatsProps.traceID}` + ); }); }); diff --git a/pkg/ui/react-app/src/pages/graph/QueryStatsView.tsx b/pkg/ui/react-app/src/pages/graph/QueryStatsView.tsx index 8bfd91c74bd..d3e9a7a5c43 100644 --- a/pkg/ui/react-app/src/pages/graph/QueryStatsView.tsx +++ b/pkg/ui/react-app/src/pages/graph/QueryStatsView.tsx @@ -4,16 +4,17 @@ export interface QueryStats { loadTime: number; resolution: number; resultSeries: number; + traceID: string | null; } const QueryStatsView: FC = (props) => { - const { loadTime, resolution, resultSeries } = props; + const { loadTime, resolution, resultSeries, traceID } = props; + const prev = `Load time: ${loadTime}ms   Resolution: ${resolution}s   Result series: ${resultSeries}`; + const str = traceID ? prev + `   Trace ID: ${traceID}` : prev; return (
- - Load time: {loadTime}ms   Resolution: {resolution}s   Result series: {resultSeries} - +
); }; diff --git a/pkg/ui/react-app/src/types/types.ts b/pkg/ui/react-app/src/types/types.ts index ca31bc6cc44..08054fa1ac7 100644 --- a/pkg/ui/react-app/src/types/types.ts +++ b/pkg/ui/react-app/src/types/types.ts @@ -17,6 +17,7 @@ export interface QueryParams { startTime: number; endTime: number; resolution: number; + traceID: string; } export type Rule = { From d27365c5bdc339c699c8da5b1227c926cdfce2eb Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Sun, 24 Dec 2023 10:31:01 +0100 Subject: [PATCH 2/5] docs: add promcon 2023 thanos talks Signed-off-by: Michael Hoffmann --- docs/getting-started.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/getting-started.md b/docs/getting-started.md index 9e0a7a8ed05..da6b414472b 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -88,6 +88,10 @@ See up to date [jsonnet mixins](https://github.com/thanos-io/thanos/tree/main/mi ## Talks +* 2023 + * [Planetscale monitoring: Handling billions of active series with Prometheus and Thanos](https://www.youtube.com/watch?v=Or8r46fSaOg) + * [Taming the Tsunami: low latency ingestion of push-based metrics in Prometheus](https://www.youtube.com/watch?v=W81x1j765hc) + * 2022 * [Story of Correlation: Integrating Thanos Metrics with Observability Signals](https://www.youtube.com/watch?v=rWFb01GW0mQ) * [Running the Observability As a Service For Your Teams With Thanos](https://www.youtube.com/watch?v=I4Mfyfd_4M8) From 2d6acc5c1cb378d646fc90a919f1c4024b98f107 Mon Sep 17 00:00:00 2001 From: Kartikay <120778728+kartikaysaxena@users.noreply.github.com> Date: Mon, 25 Dec 2023 03:25:17 +0530 Subject: [PATCH 3/5] =?UTF-8?q?Added=20website=20page=20for=20companies=20?= =?UTF-8?q?who=20offer=20consultancy=20and=20enterprise=E2=80=A6=20(#7000)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added website page for companies who offer consultancy and enterprise support for Thanos Signed-off-by: Kartikay * adopters.yml revert Signed-off-by: Kartikay * retrigger checks Signed-off-by: Kartikay * added a new line in welcome.md Signed-off-by: Kartikay * retrigger checks Signed-off-by: Kartikay --------- Signed-off-by: Kartikay --- .mdox.yaml | 4 ++ docs/support/welcome.md | 6 +++ website/data/adopters.yml | 2 +- website/layouts/_default/baseof.html | 3 ++ .../partials/versioning/version-picker.html | 1 + website/layouts/support/list.html | 28 +++++++++++++ website/layouts/support/single.html | 14 +++++++ website/static/cloudraft.png | Bin 0 -> 1883 bytes website/static/o11y.svg | 37 ++++++++++++++++++ 9 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 docs/support/welcome.md create mode 100644 website/layouts/support/list.html create mode 100644 website/layouts/support/single.html create mode 100644 website/static/cloudraft.png create mode 100644 website/static/o11y.svg diff --git a/.mdox.yaml b/.mdox.yaml index 4f8a0be006c..1c9c4dd589c 100644 --- a/.mdox.yaml +++ b/.mdox.yaml @@ -72,6 +72,10 @@ transformations: backMatter: *docBackMatter # Non-versioned element: Blog. + + - glob: "support/*" + path: /../support/* + - glob: "blog/*" path: /../blog/* diff --git a/docs/support/welcome.md b/docs/support/welcome.md new file mode 100644 index 00000000000..c4e48c55823 --- /dev/null +++ b/docs/support/welcome.md @@ -0,0 +1,6 @@ +--- +title: Welcome to Support and Training! +author: Thanos Team +--- + +Anyone who has developed a Thanos training program or offers related services can add themselves to this page by opening a pull request against it. diff --git a/website/data/adopters.yml b/website/data/adopters.yml index 7605b38f59e..1ae6e5b2645 100644 --- a/website/data/adopters.yml +++ b/website/data/adopters.yml @@ -236,4 +236,4 @@ adopters: logo: grupo-olx.png - name: TrueLayer url: https://truelayer.com/ - logo: truelayer.png + logo: truelayer.png \ No newline at end of file diff --git a/website/layouts/_default/baseof.html b/website/layouts/_default/baseof.html index 4fd3803768a..83b539295be 100644 --- a/website/layouts/_default/baseof.html +++ b/website/layouts/_default/baseof.html @@ -46,6 +46,9 @@ + diff --git a/website/layouts/partials/versioning/version-picker.html b/website/layouts/partials/versioning/version-picker.html index 732afd453d7..f2f4fa9c1e6 100644 --- a/website/layouts/partials/versioning/version-picker.html +++ b/website/layouts/partials/versioning/version-picker.html @@ -4,6 +4,7 @@ {{- range .Site.Sections.Reverse }} {{- $version := .Section }} {{- if eq $version "blog" }}{{continue}}{{end}} + {{- if eq $version "support" }}{{continue}}{{end}} {{ $version }} diff --git a/website/layouts/support/list.html b/website/layouts/support/list.html new file mode 100644 index 00000000000..84057b7ef66 --- /dev/null +++ b/website/layouts/support/list.html @@ -0,0 +1,28 @@ +{{ define "main" }} +
+
+
+ {{ range .Paginator.Pages }} +
+

Support and Training

+

Firms that offer consultancy and enterprise support.

+ +
+ {{ .Summary }} +
+ + {{ end}} + {{ template "_internal/pagination.html" . }} +
+
+
+ {{ end }} \ No newline at end of file diff --git a/website/layouts/support/single.html b/website/layouts/support/single.html new file mode 100644 index 00000000000..f0a955227ea --- /dev/null +++ b/website/layouts/support/single.html @@ -0,0 +1,14 @@ +{{ define "main" }} +
+
+
+
+

{{ .Title }}

+

+
+ {{ .Content }} +
+
+
+
+ {{ end }} \ No newline at end of file diff --git a/website/static/cloudraft.png b/website/static/cloudraft.png new file mode 100644 index 0000000000000000000000000000000000000000..c76ca7987f5cd70f35a42410515cca131540e5b9 GIT binary patch literal 1883 zcmV-h2c-CkP)5<7Wvxj<7d zDbPI{?CwcZ<|-l`-`5awu6(&M3|a0Wtk1tfK3xAX^(m>)Uw&*m^-=aEFm<@A!_PXr zcZ6*n3hXPyAyz&;U3Y`=v2}Px_x->7G#2UHqN$6N#xZE>xrn@m%hZIIIy@k*tqx%m z%H;YjFf0!~Z>qN;6oUI@O0XUI-eu$)hKFN?tP%^_qF)g$+6_4t264)Is8S5{$g(W- zSaH3Jye36H8E&D+WtMq|VK70y4tW^l$#vb4_j6|6GQ4~`a`VsF@3>_h-q4tCY)V{- zwDgx6Qhf0zf=7d9lui_v_4+e;YUZ>O?bj!-$Wn< z>@z*A&y3PPMx-5crZs5GK&F#Jw+sgRGsqUY8CQ;`7J4l5W?CD0jX~2^DYKtL+U(|n zGA;55`)O{T2IKb2E*vr}23Zjlx6l!@uCN%lazYtm9VUcDd;PAtzqu$anT5_=*4f0LaKI8zJs$2$5JC{Ds@8b62q=`JXoYKEgA&s@Te&gf3 zL53iOwOBBuwBhcS67$50_;=(ZO}U))*SQm#tOnyDXYvwy3N-gu2EWP4<_i`KJ*Sll zo3bVG9N=w{jK=wNj8eo6mJ4q)H2!!j)6!ty=b&ZXaeaxMEIiX%q)$Ma*}BBUkkT4) zd}{A+Dq%+(LXKZS{wvJ4e)^Dd84NS%Ni5pl=@dt&L8D0j9J!txi0n8dQ@=zSG?B$V z_MySUVM0D)c~lsNP(hdM+z1D#SjY_B_5^y4$~)|+98%bO=#gIh&5<-RUz(MtXtT+e zPM@$m42B_W@CyGADn;6GS<(m1N+}9USpT=e@r@b0RrT86SCLp=g*d+mmp3V-w9*tZ zm?wv3#avU?bmYG@E|5kA-79IdUGfj;_c|A7+M{nROeQjW!Rd)Ys?s`~DWtTOI5wE) z+AVaZSmcj@p@VLz`;ePbx+t;#T4xNo&s=$ZyBJt$Y1EUO zu5U0uN;IZEHB!!=VR`K<;*?0v>?zv;*Fl+6U~qU_;8?r`W%og83LmeqdpRIB$o=DU z?9Wj+v|o2hbx!nQG2Sas1=_Ehr6|uW;=f2Bp@BlaCgu>2EOW}&0bkI5jWGs?^jGsX z(A8@p(ihNifp&lH!(gWG)5=C9K?~@JQFM!OD@S%@3Yt+iR9NlW{wAU9HPQwNB2{T= zfO!bYjUA*N(mU8abV|4t=?(N~>c?QFN^7wHMF48tz6P~u6#h9P?UxcsJvRbt3Fa?YFzGf6@hsHrsr{U&` z3cK|HeKLj>weW#{qqP>VOMXMMwM=$yi+CvQR*|08jeXx1qP026KoxKNxxxqDTg@{3 z?(e|_d6d|_44>jj{d}bC63v=B^oVkqIcqO^272s%`LT{U@Y|Hz@TmP*0m;Ec1LLuE z^jgHfA&nC4$3Il(Q#$!3$3g!|NlI^f^1V>z83qHL44S-YNic-df;!1YQLc7|Wgelg zKPjum6lWYVOgXeoz{g7W!^^yks6xLqFs(-66yodb@~DEtg!zd2H4!33q}NDN*TEBfZ@J~zjU~tom*Zf + + + + + + + + + + + From 665e64370a2cdfb30a7059c26ba7bdbae9a26309 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 24 Dec 2023 13:56:00 -0800 Subject: [PATCH 4/5] Lazy downloaded index header (#6984) * lazy downloaded index header Signed-off-by: Ben Ye * update tests Signed-off-by: Ben Ye * address comments Signed-off-by: Ben Ye * address comments Signed-off-by: Ben Ye * changelog Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + cmd/thanos/store.go | 10 + docs/components/store.md | 6 + pkg/block/indexheader/header_test.go | 2 +- pkg/block/indexheader/lazy_binary_reader.go | 7 +- .../indexheader/lazy_binary_reader_test.go | 336 ++++++++++-------- pkg/block/indexheader/reader_pool.go | 46 ++- pkg/block/indexheader/reader_pool_test.go | 13 +- pkg/store/bucket.go | 44 ++- pkg/store/bucket_test.go | 2 +- 10 files changed, 294 insertions(+), 173 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df6e93e4cda..64b849f9b17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs. - [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule. - [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Apply series limit when streaming series for series actually matched if lazy postings is enabled. +- [#6984](https://github.com/thanos-io/thanos/pull/6984) Store Gateway: Added `--store.index-header-lazy-download-strategy` to specify how to lazily download index headers when lazy mmap is enabled. ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9191b77ce8e..7d80687ec30 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -28,6 +28,7 @@ import ( blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" hidden "github.com/thanos-io/thanos/pkg/extflag" @@ -89,6 +90,8 @@ type storeConfig struct { lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration lazyExpandedPostingsEnabled bool + + indexHeaderLazyDownloadStrategy string } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -186,6 +189,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("store.index-header-lazy-download-strategy", "Strategy of how to download index headers lazily. Supported values: eager, lazy. If eager, always download index header during initial load. If lazy, download index header during query time."). + Default(string(indexheader.EagerDownloadStrategy)). + EnumVar(&sc.indexHeaderLazyDownloadStrategy, string(indexheader.EagerDownloadStrategy), string(indexheader.LazyDownloadStrategy)) + cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -388,6 +395,9 @@ func runStore( return conf.estimatedMaxChunkSize }), store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), + store.WithIndexHeaderLazyDownloadStrategy( + indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(), + ), } if conf.debugLogging { diff --git a/docs/components/store.md b/docs/components/store.md index 3ba59a2d9e0..85fd4ce6886 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -193,6 +193,12 @@ Flags: DEPRECATED: use store.limits.request-samples. --store.grpc.touched-series-limit=0 DEPRECATED: use store.limits.request-series. + --store.index-header-lazy-download-strategy=eager + Strategy of how to download index headers + lazily. Supported values: eager, lazy. + If eager, always download index header during + initial load. If lazy, download index header + during query time. --store.limits.request-samples=0 The maximum samples allowed for a single Series request, The Series call fails if diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 56dabc33f7c..4130157a96a 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -206,7 +206,7 @@ func TestReaders(t *testing.T) { _, err := WriteBinary(ctx, bkt, id, fn) testutil.Ok(t, err) - br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil) + br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil, false) testutil.Ok(t, err) defer func() { testutil.Ok(t, br.Close()) }() diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index d7e589c724f..2b36bf80259 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -83,6 +83,9 @@ type LazyBinaryReader struct { // Keep track of the last time it was used. usedAt *atomic.Int64 + + // If true, index header will be downloaded at query time rather than initialization time. + lazyDownload bool } // NewLazyBinaryReader makes a new LazyBinaryReader. If the index-header does not exist @@ -99,8 +102,9 @@ func NewLazyBinaryReader( metrics *LazyBinaryReaderMetrics, binaryReaderMetrics *BinaryReaderMetrics, onClosed func(*LazyBinaryReader), + lazyDownload bool, ) (*LazyBinaryReader, error) { - if dir != "" { + if dir != "" && !lazyDownload { indexHeaderFile := filepath.Join(dir, id.String(), block.IndexHeaderFilename) // If the index-header doesn't exist we should download it. if _, err := os.Stat(indexHeaderFile); err != nil { @@ -131,6 +135,7 @@ func NewLazyBinaryReader( binaryReaderMetrics: binaryReaderMetrics, usedAt: atomic.NewInt64(time.Now().UnixNano()), onClosed: onClosed, + lazyDownload: lazyDownload, }, nil } diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 150f2d649b3..d740da99abd 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -5,6 +5,7 @@ package indexheader import ( "context" + "fmt" "os" "path/filepath" "sync" @@ -31,11 +32,11 @@ func TestNewLazyBinaryReader_ShouldFailIfUnableToBuildIndexHeader(t *testing.T) bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() - _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil) + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil, false) testutil.NotOk(t, err) } -func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { +func TestNewLazyBinaryReader_ShouldNotFailIfUnableToBuildIndexHeaderWhenLazyDownload(t *testing.T) { ctx := context.Background() tmpDir := t.TempDir() @@ -43,36 +44,61 @@ func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) testutil.Ok(t, err) defer func() { testutil.Ok(t, bkt.Close()) }() - - // Create block. - blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ - {{Name: "a", Value: "1"}}, - {{Name: "a", Value: "2"}}, - }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) + _, err = NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, ulid.MustNew(0, nil), 3, NewLazyBinaryReaderMetrics(nil), NewBinaryReaderMetrics(nil), nil, true) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) +} - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) +func TestNewLazyBinaryReader_ShouldBuildIndexHeaderFromBucket(t *testing.T) { + ctx := context.Background() - // Should lazy load the index upon first usage. - v, err := r.IndexVersion() - testutil.Ok(t, err) - testutil.Equals(t, 2, v) - testutil.Assert(t, r.reader != nil) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + tmpDir := t.TempDir() - labelNames, err := r.LabelNames() + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + defer func() { testutil.Ok(t, bkt.Close()) }() + + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + _, err = os.Stat(filepath.Join(r.dir, blockID.String(), block.IndexHeaderFilename)) + // Index file shouldn't exist. + if lazyDownload { + testutil.Equals(t, true, os.IsNotExist(err)) + } + // Should lazy load the index upon first usage. + v, err := r.IndexVersion() + testutil.Ok(t, err) + if lazyDownload { + _, err = os.Stat(filepath.Join(r.dir, blockID.String(), block.IndexHeaderFilename)) + testutil.Ok(t, err) + } + testutil.Equals(t, 2, v) + testutil.Assert(t, r.reader != nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + }) + } } func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { @@ -96,22 +122,26 @@ func TestNewLazyBinaryReader_ShouldRebuildCorruptedIndexHeader(t *testing.T) { headerFilename := filepath.Join(tmpDir, blockID.String(), block.IndexHeaderFilename) testutil.Ok(t, os.WriteFile(headerFilename, []byte("xxx"), os.ModePerm)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - - // Ensure it can read data. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + }) + } } func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { @@ -131,37 +161,41 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - - // Should lazy load the index upon first usage. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - - // Close it. - testutil.Ok(t, r.Close()) - testutil.Assert(t, r.reader == nil) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Should lazy load again upon next usage. - labelNames, err = r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - - // Closing an already closed lazy reader should be a no-op. - for i := 0; i < 2; i++ { - testutil.Ok(t, r.Close()) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Close it. + testutil.Ok(t, r.Close()) + testutil.Assert(t, r.reader == nil) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Should lazy load again upon next usage. + labelNames, err = r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + + // Closing an already closed lazy reader should be a no-op. + for i := 0; i < 2; i++ { + testutil.Ok(t, r.Close()) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + } + }) } } @@ -182,34 +216,38 @@ func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - - // Should lazy load the index upon first usage. - labelNames, err := r.LabelNames() - testutil.Ok(t, err) - testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Try to unload but not idle since enough time. - testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) - - // Try to unload and idle since enough time. - testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Try to unload but not idle since enough time. + testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Try to unload and idle since enough time. + testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + }) + } } func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { @@ -232,49 +270,53 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - m := NewLazyBinaryReaderMetrics(nil) - bm := NewBinaryReaderMetrics(nil) - r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil) - testutil.Ok(t, err) - testutil.Assert(t, r.reader == nil) - t.Cleanup(func() { - testutil.Ok(t, r.Close()) - }) - - done := make(chan struct{}) - time.AfterFunc(runDuration, func() { close(done) }) - wg := sync.WaitGroup{} - wg.Add(2) - - // Start a goroutine which continuously try to unload the reader. - go func() { - defer wg.Done() - - for { - select { - case <-done: - return - default: - testutil.Ok(t, r.unloadIfIdleSince(0)) - } - } - }() - - // Try to read multiple times, while the other goroutine continuously try to unload it. - go func() { - defer wg.Done() - - for { - select { - case <-done: - return - default: - _, err := r.PostingsOffset("a", "1") - testutil.Assert(t, err == nil || err == errUnloadedWhileLoading) - } - } - }() - - // Wait until both goroutines have done. - wg.Wait() + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + t.Cleanup(func() { + testutil.Ok(t, r.Close()) + }) + + done := make(chan struct{}) + time.AfterFunc(runDuration, func() { close(done) }) + wg := sync.WaitGroup{} + wg.Add(2) + + // Start a goroutine which continuously try to unload the reader. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + testutil.Ok(t, r.unloadIfIdleSince(0)) + } + } + }() + + // Try to read multiple times, while the other goroutine continuously try to unload it. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + _, err := r.PostingsOffset("a", "1") + testutil.Assert(t, err == nil || err == errUnloadedWhileLoading) + } + } + }() + + // Wait until both goroutines have done. + wg.Wait() + }) + } } diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index fc8cb268139..e9fe5eb7dca 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -14,6 +14,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + + "github.com/thanos-io/thanos/pkg/block/metadata" ) // ReaderPoolMetrics holds metrics tracked by ReaderPool. @@ -46,10 +48,47 @@ type ReaderPool struct { // Keep track of all readers managed by the pool. lazyReadersMx sync.Mutex lazyReaders map[*LazyBinaryReader]struct{} + + lazyDownloadFunc LazyDownloadIndexHeaderFunc +} + +// IndexHeaderLazyDownloadStrategy specifies how to download index headers +// lazily. Only used when lazy mmap is enabled. +type IndexHeaderLazyDownloadStrategy string + +const ( + // EagerDownloadStrategy always disables lazy downloading index headers. + EagerDownloadStrategy IndexHeaderLazyDownloadStrategy = "eager" + // LazyDownloadStrategy always lazily download index headers. + LazyDownloadStrategy IndexHeaderLazyDownloadStrategy = "lazy" +) + +func (s IndexHeaderLazyDownloadStrategy) StrategyToDownloadFunc() LazyDownloadIndexHeaderFunc { + switch s { + case LazyDownloadStrategy: + return AlwaysLazyDownloadIndexHeader + default: + // Always fallback to eager download index header. + return AlwaysEagerDownloadIndexHeader + } +} + +// LazyDownloadIndexHeaderFunc is used to determinte whether to download the index header lazily +// or not by checking its block metadata. Usecase can be by time or by index file size. +type LazyDownloadIndexHeaderFunc func(meta *metadata.Meta) bool + +// AlwaysEagerDownloadIndexHeader always eagerly download index header. +func AlwaysEagerDownloadIndexHeader(meta *metadata.Meta) bool { + return false +} + +// AlwaysLazyDownloadIndexHeader always lazily download index header. +func AlwaysLazyDownloadIndexHeader(meta *metadata.Meta) bool { + return true } // NewReaderPool makes a new ReaderPool. -func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics) *ReaderPool { +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics, lazyDownloadFunc LazyDownloadIndexHeaderFunc) *ReaderPool { p := &ReaderPool{ logger: logger, metrics: metrics, @@ -57,6 +96,7 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime lazyReaderIdleTimeout: lazyReaderIdleTimeout, lazyReaders: make(map[*LazyBinaryReader]struct{}), close: make(chan struct{}), + lazyDownloadFunc: lazyDownloadFunc, } // Start a goroutine to close idle readers (only if required). @@ -81,12 +121,12 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime // NewBinaryReader creates and returns a new binary reader. If the pool has been configured // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. -func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (Reader, error) { +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, meta *metadata.Meta) (Reader, error) { var reader Reader var err error if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed) + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta)) } else { reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader) } diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index 4ed60ea8fb4..a7445f0fed2 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -54,12 +54,15 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + meta, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(t, err) + for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil)) + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil), AlwaysEagerDownloadIndexHeader) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) testutil.Ok(t, err) defer func() { testutil.Ok(t, r.Close()) }() @@ -89,12 +92,14 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + meta, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(t, err) metrics := NewReaderPoolMetrics(nil) - pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics) + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics, AlwaysEagerDownloadIndexHeader) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) testutil.Ok(t, err) defer func() { testutil.Ok(t, r.Close()) }() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 3ebd6f06a47..fd4fb7392c4 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -413,6 +413,8 @@ type BucketStore struct { blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator + + indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc } func (s *BucketStore) validate() error { @@ -531,6 +533,14 @@ func WithDontResort(true bool) BucketStoreOption { } } +// WithIndexHeaderLazyDownloadStrategy specifies what block to lazy download its index header. +// Only used when lazy mmap is enabled at the same time. +func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption { + return func(s *BucketStore) { + s.indexHeaderLazyDownloadStrategy = strategy + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -559,21 +569,22 @@ func NewBucketStore( b := make([]byte, 0, initialBufSize) return &b }}, - chunkPool: pool.NoopBytes{}, - blocks: map[ulid.ULID]*bucketBlock{}, - blockSets: map[uint64]*bucketBlockSet{}, - blockSyncConcurrency: blockSyncConcurrency, - queryGate: gate.NewNoop(), - chunksLimiterFactory: chunksLimiterFactory, - seriesLimiterFactory: seriesLimiterFactory, - bytesLimiterFactory: bytesLimiterFactory, - partitioner: partitioner, - enableCompatibilityLabel: enableCompatibilityLabel, - postingOffsetsInMemSampling: postingOffsetsInMemSampling, - enableSeriesResponseHints: enableSeriesResponseHints, - enableChunkHashCalculation: enableChunkHashCalculation, - seriesBatchSize: SeriesBatchSize, - sortingStrategy: sortingStrategyStore, + chunkPool: pool.NoopBytes{}, + blocks: map[ulid.ULID]*bucketBlock{}, + blockSets: map[uint64]*bucketBlockSet{}, + blockSyncConcurrency: blockSyncConcurrency, + queryGate: gate.NewNoop(), + chunksLimiterFactory: chunksLimiterFactory, + seriesLimiterFactory: seriesLimiterFactory, + bytesLimiterFactory: bytesLimiterFactory, + partitioner: partitioner, + enableCompatibilityLabel: enableCompatibilityLabel, + postingOffsetsInMemSampling: postingOffsetsInMemSampling, + enableSeriesResponseHints: enableSeriesResponseHints, + enableChunkHashCalculation: enableChunkHashCalculation, + seriesBatchSize: SeriesBatchSize, + sortingStrategy: sortingStrategyStore, + indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, } for _, option := range options { @@ -582,7 +593,7 @@ func NewBucketStore( // Depend on the options indexReaderPoolMetrics := indexheader.NewReaderPoolMetrics(extprom.WrapRegistererWithPrefix("thanos_bucket_store_", s.reg)) - s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics, s.indexHeaderLazyDownloadStrategy) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too if err := s.validate(); err != nil { @@ -759,6 +770,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er s.dir, meta.ULID, s.postingOffsetsInMemSampling, + meta, ) if err != nil { return errors.Wrap(err, "create index header reader") diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 87659f54503..67223a9467f 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1658,7 +1658,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { bkt: objstore.WithNoopInstr(bkt), logger: logger, indexCache: indexCache, - indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil)), + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.AlwaysEagerDownloadIndexHeader), metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{{b1, b2}}}, From a59a3ef4f2ed1435c8144d69ae36efcfce0cb83b Mon Sep 17 00:00:00 2001 From: Pranav <101933072+pawarpranav83@users.noreply.github.com> Date: Mon, 25 Dec 2023 14:22:06 +0530 Subject: [PATCH 5/5] tests: use remote write in query frontend tests (#6998) --- test/e2e/query_frontend_test.go | 116 +++++++++++++++++++++----------- test/e2e/query_test.go | 34 ++++++++++ 2 files changed, 112 insertions(+), 38 deletions(-) diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index f93219cae6f..5fda32c0f7b 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/prompb" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/cacheutil" @@ -41,12 +42,13 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - now := time.Now() + // Predefined Timestamp + predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC) - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "") - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) - q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init() + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) inMemoryCacheConfig := queryfrontend.CacheProviderConfig{ @@ -64,17 +66,34 @@ func TestQueryFrontend(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) + // Writing a custom Timeseries into the receiver + testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "up"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "test"}, + {Name: "replica", Value: "0"}, + }, + Samples: []prompb.Sample{ + {Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)}, + }}}, + i.Endpoint("remote-write"), + )) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) // Ensure we can get the result from Querier first so that it // doesn't need to retry when we send queries to the frontend later. - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -86,13 +105,15 @@ func TestQueryFrontend(t *testing.T) { queryTimes := vals[0] t.Run("query frontend works for instant query", func(t *testing.T) { - queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -115,8 +136,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -159,8 +180,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -181,7 +202,7 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_cache_hits_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_gets_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total")) @@ -192,9 +213,8 @@ func TestQueryFrontend(t *testing.T) { e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "query_range"))), ) - // One more request is needed in order to satisfy the req range. testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2emon.Equals(2), + e2emon.Equals(1), []string{"http_requests_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))), ) @@ -206,8 +226,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(24*time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(24*time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -225,13 +245,13 @@ func TestQueryFrontend(t *testing.T) { []string{"thanos_query_frontend_queries_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "op", "query_range"))), ) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "cortex_cache_fetched_keys_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "cortex_cache_fetched_keys_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "cortex_cache_hits_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_added_new_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_added_new_total")) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_added_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_entries")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(3), "querier_cache_gets_total")) - testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "querier_cache_misses_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_entries")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(4), "querier_cache_gets_total")) + testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(2), "querier_cache_misses_total")) // Query is 25h so it will be split to 2 requests. testutil.Ok(t, queryFrontend.WaitSumMetricsWithOptions( @@ -240,7 +260,7 @@ func TestQueryFrontend(t *testing.T) { ) testutil.Ok(t, q.WaitSumMetricsWithOptions( - e2emon.Equals(4), + e2emon.Equals(3), []string{"http_requests_total"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "handler", "query_range"))), ) @@ -248,7 +268,7 @@ func TestQueryFrontend(t *testing.T) { t.Run("query frontend splitting works for labels names API", func(t *testing.T) { // LabelNames and LabelValues API should still work via query frontend. - labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(predefTimestamp.Add(-time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) > 0 }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -267,7 +287,7 @@ func TestQueryFrontend(t *testing.T) { e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "labels"))), ) - labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelNames(t, ctx, queryFrontend.Endpoint("http"), nil, timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) > 0 }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -288,7 +308,7 @@ func TestQueryFrontend(t *testing.T) { }) t.Run("query frontend splitting works for labels values API", func(t *testing.T) { - labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(predefTimestamp.Add(-time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -307,7 +327,7 @@ func TestQueryFrontend(t *testing.T) { e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tripperware", "labels"))), ) - labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool { + labelValues(t, ctx, queryFrontend.Endpoint("http"), "instance", nil, timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []string) bool { return len(res) == 1 && res[0] == "localhost:9090" }) testutil.Ok(t, q.WaitSumMetricsWithOptions( @@ -333,8 +353,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []map[string]string) bool { if len(res) != 1 { return false @@ -345,6 +365,8 @@ func TestQueryFrontend(t *testing.T) { "instance": "localhost:9090", "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", + "tenant_id": "default-tenant", }) }, ) @@ -369,8 +391,8 @@ func TestQueryFrontend(t *testing.T) { ctx, queryFrontend.Endpoint("http"), []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "up")}, - timestamp.FromTime(now.Add(-24*time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-24*time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), func(res []map[string]string) bool { if len(res) != 1 { return false @@ -381,6 +403,8 @@ func TestQueryFrontend(t *testing.T) { "instance": "localhost:9090", "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", + "tenant_id": "default-tenant", }) }, ) @@ -409,12 +433,13 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - now := time.Now() + // Predefined timestamp + predefTimestamp := time.Date(2023, time.December, 22, 12, 0, 0, 0, time.UTC) - prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "1", e2ethanos.DefaultPromConfig("test", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "") - testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + i := e2ethanos.NewReceiveBuilder(e, "ingestor-rw").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) - q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).Init() + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) memcached := e2ethanos.NewMemcached(e, "1") @@ -443,19 +468,34 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) + testutil.Ok(t, remoteWrite(ctx, []prompb.TimeSeries{{ + Labels: []prompb.Label{ + {Name: "__name__", Value: "up"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "test"}, + {Name: "replica", Value: "0"}, + }, + Samples: []prompb.Sample{ + {Value: float64(0), Timestamp: timestamp.FromTime(predefTimestamp)}, + }}}, + i.Endpoint("remote-write"))) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) testutil.Ok(t, queryFrontend.WaitSumMetrics(e2emon.Equals(1), "cortex_memcache_client_servers")) // Ensure we can get the result from Querier first so that it // doesn't need to retry when we send queries to the frontend later. - queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, func() time.Time { return predefTimestamp }, promclient.QueryOptions{ Deduplicate: false, }, []model.Metric{ { "job": "myself", "prometheus": "test", + "receive": "receive-ingestor-rw", "replica": "0", + "tenant_id": "default-tenant", }, }) @@ -469,8 +509,8 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, @@ -501,8 +541,8 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { ctx, queryFrontend.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, - timestamp.FromTime(now.Add(-time.Hour)), - timestamp.FromTime(now.Add(time.Hour)), + timestamp.FromTime(predefTimestamp.Add(-time.Hour)), + timestamp.FromTime(predefTimestamp.Add(time.Hour)), 14, promclient.QueryOptions{ Deduplicate: true, diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 6584c7b8426..5b9a120b902 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -4,6 +4,7 @@ package e2e_test import ( + "bytes" "context" "fmt" "io" @@ -1720,6 +1721,39 @@ func rangeQuery(t *testing.T, ctx context.Context, addr string, q func() string, return retExplanation } +// Performs a remote write at the receiver external endpoint. +func remoteWrite(ctx context.Context, timeseries []prompb.TimeSeries, addr string) error { + // Create write request + data, err := proto.Marshal(&prompb.WriteRequest{Timeseries: timeseries}) + if err != nil { + return err + } + + // Create HTTP request + compressed := snappy.Encode(nil, data) + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/receive", addr), bytes.NewReader(compressed)) + if err != nil { + return err + } + + req.Header.Add("Content-Encoding", "snappy") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + // Execute HTTP request + res, err := promclient.NewDefaultClient().HTTPClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer runutil.ExhaustCloseWithErrCapture(&err, res.Body, "%s: close body", req.URL.String()) + + if res.StatusCode/100 != 2 { + return errors.Errorf("request failed with code %s", res.Status) + } + + return nil +} + func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, end int64, check func(data []*exemplarspb.ExemplarData) error) { t.Helper()