Skip to content

Commit

Permalink
merge oss main on 2024-11-07
Browse files Browse the repository at this point in the history
  • Loading branch information
jnyi committed Nov 7, 2024
2 parents a5e5f7a + 928bc7a commit 28357e9
Show file tree
Hide file tree
Showing 42 changed files with 847 additions and 375 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.
- [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints.
- [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging.
- [#7832](https://github.com/thanos-io/thanos/pull/7832) Query Frontend: Fix cache keys for dynamic split intervals.
- [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call.

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand All @@ -35,6 +37,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7853](https://github.com/thanos-io/thanos/pull/7853) UI: Add support for selecting graph time range with mouse drag.
- [#7855](https://github.com/thanos-io/thanos/pull/7855) Compcat/Query: Add support for comma separated replica labels.
- [#7654](https://github.com/thanos-io/thanos/pull/7654) *: Add '--grpc-server-tls-min-version' flag to allow user to specify TLS version, otherwise default to TLS 1.3
- [#7854](https://github.com/thanos-io/thanos/pull/7854) Query Frontend: Add `--query-frontend.force-query-stats` flag to force collection of query statistics from upstream queriers.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.e2e-tests
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Taking a non-alpine image for e2e tests so that cgo can be enabled for the race detector.
FROM golang:1.23.2 as builder
FROM golang:1.23.3 as builder

WORKDIR $GOPATH/src/github.com/thanos-io/thanos

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.multi-stage
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# By default we pin to amd64 sha. Use make docker to automatically adjust for arm64 versions.
ARG BASE_DOCKER_SHA="14d68ca3d69fceaa6224250c83d81d935c053fb13594c811038c461194599973"
FROM golang:1.23.2-alpine3.20 as builder
FROM golang:1.23.3-alpine3.20 as builder

WORKDIR $GOPATH/src/github.com/thanos-io/thanos
# Change in the docker context invalidates the cache so to leverage docker
Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ func runCompact(
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
api.SetLoaded(blocks, err)
})

var syncMetasTimeout = conf.waitInterval
if !conf.wait {
syncMetasTimeout = 0
}
sy, err = compact.NewMetaSyncer(
logger,
reg,
Expand All @@ -299,6 +304,7 @@ func runCompact(
ignoreDeletionMarkFilter,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
syncMetasTimeout,
)
if err != nil {
return errors.Wrap(err, "create syncer")
Expand Down
15 changes: 8 additions & 7 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
),
dnsSDInterval,
logger,
)

dnsEndpointProvider := dns.NewProvider(
Expand Down Expand Up @@ -633,7 +634,7 @@ func runQuery(
fileSDCache.Update(update)
endpoints.Update(ctxUpdate)

if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}

Expand All @@ -653,22 +654,22 @@ func runQuery(
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval)
defer resolveCancel()
if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil {
if err := dnsTargetProvider.Resolve(ctx, targetAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err)
}
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
}
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs); err != nil {
if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err)
}
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs); err != nil {
if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err)

}
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ func registerQueryFrontend(app *extkingpin.App) {
cmd.Flag("failed-query-cache-capacity", "Capacity of cache for failed queries. 0 means this feature is disabled.").
Default("0").IntVar(&cfg.CortexHandlerConfig.FailedQueryCacheCapacity)

cmd.Flag("query-frontend.force-query-stats", "Enables query statistics for all queries and will export statistics as logs and service headers.").Default("false").BoolVar(&cfg.CortexHandlerConfig.QueryStatsEnabled)

cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
" and both flags cannot be used at the same time. "+
"Request header names used to identify the source of slow queries (repeated flag). "+
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func runRule(
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second)
defer resolveCancel()
if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints); err != nil {
if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err)
}
return nil
Expand Down
45 changes: 27 additions & 18 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ func runSidecar(
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

if err := m.UpdateTimestamps(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch timestamps. Is Prometheus running? Retrying",
"err", err,
)
return err
}

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -266,16 +274,21 @@ func runSidecar(
return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()
if err := m.UpdateTimestamps(iterCtx); err != nil {
level.Warn(logger).Log("msg", "updating timestamps failed", "err", err)
promUp.Set(0)
statusProber.NotReady(err)
return nil
}

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
level.Warn(logger).Log("msg", "updating labels failed", "err", err)
promUp.Set(0)
statusProber.NotReady(err)
} else {
promUp.Set(1)
statusProber.Ready()
return nil
}

promUp.Set(1)
statusProber.Ready()
return nil
})
}, func(error) {
Expand Down Expand Up @@ -317,7 +330,7 @@ func runSidecar(
}),
info.WithStoreInfoFunc(func() (*infopb.StoreInfo, error) {
if httpProbe.IsReady() {
mint, maxt := promStore.Timestamps()
mint, maxt := m.Timestamps()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
Expand Down Expand Up @@ -409,13 +422,6 @@ func runSidecar(
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

minTime, _, err := s.Timestamps()
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
return nil
}
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -490,16 +496,19 @@ func (s *promMetadata) UpdateLabels(ctx context.Context) error {
return nil
}

func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
func (s *promMetadata) UpdateTimestamps(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

if mint < s.limitMinTime.PrometheusTimestamp() {
mint = s.limitMinTime.PrometheusTimestamp()
mint, err := s.client.LowestTimestamp(ctx, s.promURL)
if err != nil {
return err
}

s.mint = mint
s.maxt = maxt
s.mint = min(s.limitMinTime.PrometheusTimestamp(), mint)
s.maxt = math.MaxInt64

return nil
}

func (s *promMetadata) Labels() labels.Labels {
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
ignoreDeletionMarkFilter,
stubCounter,
stubCounter,
0,
)
if err != nil {
return errors.Wrap(err, "create syncer")
Expand Down Expand Up @@ -1413,6 +1414,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
ignoreDeletionMarkFilter,
stubCounter,
stubCounter,
0,
)
if err != nil {
return errors.Wrap(err, "create syncer")
Expand Down
4 changes: 4 additions & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ Flags:
functions in query-frontend.
--no-query-frontend.enable-x-functions for
disabling.
--query-frontend.force-query-stats
Enables query statistics for all queries and
will export statistics as logs and service
headers.
--query-frontend.forward-header=<http-header-name> ...
List of headers forwarded by the query-frontend
to downstream queriers, default is empty
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/prometheus/common v0.60.0
github.com/prometheus/exporter-toolkit v0.12.0
// Prometheus maps version 2.x.y to tags v0.x.y.
github.com/prometheus/prometheus v0.54.2-0.20240920164404-6f0d6038b7f9
github.com/prometheus/prometheus v0.55.1-0.20241102120812-a6fd22b9d2c8
github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.9.0
github.com/thanos-io/objstore v0.0.0-20240913074259-63feed0da069
Expand Down Expand Up @@ -222,7 +222,7 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect
github.com/google/uuid v1.6.0
github.com/googleapis/enterprise-certificate-proxy v0.3.3 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1818,8 +1818,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5
github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/enterprise-certificate-proxy v0.2.5/go.mod h1:RxW0N9901Cko1VOCW3SXCpWP+mlIEkk2tP7jnHy9a3w=
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/enterprise-certificate-proxy v0.3.3 h1:QRje2j5GZimBzlbhGA2V2QlGNgL8G6e+wGo/+/2bWI0=
github.com/googleapis/enterprise-certificate-proxy v0.3.3/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA=
github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw=
github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA=
github.com/googleapis/gax-go v2.0.2+incompatible h1:silFMLAnr330+NRuag/VjIGF7TLp/LBrV2CJKFLWEww=
github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
Expand Down Expand Up @@ -2173,8 +2173,8 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/prometheus/prometheus v0.54.2-0.20240920164404-6f0d6038b7f9 h1:Uad9KpJf6hvv2JPb64pDPYUAzssaD+GRJHJMMGMEEaM=
github.com/prometheus/prometheus v0.54.2-0.20240920164404-6f0d6038b7f9/go.mod h1:n3/PY5be8xgYe+DUCjKdK0eSmDSQBQ6ZOoIUGmO9vEY=
github.com/prometheus/prometheus v0.55.1-0.20241102120812-a6fd22b9d2c8 h1:hCxAh6+hxwy7dqUPE5ndnilMeCWrqQkJVjPDXtiYRVo=
github.com/prometheus/prometheus v0.55.1-0.20241102120812-a6fd22b9d2c8/go.mod h1:GGS7QlWKCqCbcEzWsVahYIfQwiGhcExkarHyLJTsv6I=
github.com/redis/rueidis v1.0.45-alpha.1 h1:69Bu1l7gVC/qDYuGGwMwGg2rjOjSyxESol/Zila62gY=
github.com/redis/rueidis v1.0.45-alpha.1/go.mod h1:q7BfhDaPt7xxwy2nv2RqQO12/mmHflDjebpcNwWFjms=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
Expand Down
2 changes: 1 addition & 1 deletion internal/cortex/chunk/cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (c *memcachedClient) updateMemcacheServers() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := c.provider.Resolve(ctx, c.addresses); err != nil {
if err := c.provider.Resolve(ctx, c.addresses, true); err != nil {
return err
}
servers = c.provider.Addresses()
Expand Down
50 changes: 48 additions & 2 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if shouldReportSlowQuery {
f.reportSlowQuery(r, hs, queryString, queryBytesFetched, queryResponseTime)
f.reportSlowQuery(r, hs, queryString, queryResponseTime, stats, queryBytesFetched)
}
if shouldReportExpensiveQuery {
f.reportExpensiveQuery(r, queryString, queryBytesFetched, queryResponseTime)
Expand Down Expand Up @@ -297,7 +297,14 @@ func isQueryEndpoint(path string) bool {
}

// reportSlowQuery reports slow queries.
func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header, queryString url.Values, queryBytesFetched uint64, queryResponseTime time.Duration) {
func (f *Handler) reportSlowQuery(
r *http.Request,
responseHeaders http.Header,
queryString url.Values,
queryResponseTime time.Duration,
stats *querier_stats.Stats,
queryBytesFetched uint64,
) {
f.slowQueryCount.Inc()
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
grafanaDashboardUID := "-"
Expand Down Expand Up @@ -335,6 +342,9 @@ func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header,
"trace_id", thanosTraceID,
}, formatQueryString(queryString)...)

logMessage = addQueryRangeToLogMessage(logMessage, queryString)
logMessage = f.addStatsToLogMessage(logMessage, stats)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

Expand Down Expand Up @@ -368,6 +378,8 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"fetched_series_count", numSeries,
"fetched_chunks_bytes", numBytes,
}, formatQueryString(queryString)...)
f.addStatsToLogMessage(logMessage, stats)
addQueryRangeToLogMessage(logMessage, queryString)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}
Expand All @@ -393,6 +405,40 @@ func formatQueryString(queryString url.Values) (fields []interface{}) {
return fields
}

func (f *Handler) addStatsToLogMessage(message []interface{}, stats *querier_stats.Stats) []interface{} {
if stats != nil {
message = append(message, "peak_samples", stats.LoadPeakSamples())
message = append(message, "total_samples_loaded", stats.LoadTotalSamples())
}

return message
}

func addQueryRangeToLogMessage(logMessage []interface{}, queryString url.Values) []interface{} {
queryRange := extractQueryRange(queryString)
if queryRange != time.Duration(0) {
logMessage = append(logMessage, "query_range_hours", int(queryRange.Hours()))
logMessage = append(logMessage, "query_range_human", queryRange.String())
}
return logMessage
}

// extractQueryRange extracts query range from query string.
// If start and end are not provided or are invalid, it returns a duration with zero-value.
func extractQueryRange(queryString url.Values) time.Duration {
startStr := queryString.Get("start")
endStr := queryString.Get("end")
var queryRange = time.Duration(0)
if startStr != "" && endStr != "" {
start, serr := util.ParseTime(startStr)
end, eerr := util.ParseTime(endStr)
if serr == nil && eerr == nil {
queryRange = time.Duration(end-start) * time.Millisecond
}
}
return queryRange
}

func writeError(w http.ResponseWriter, err error) {
switch err {
case context.Canceled:
Expand Down
15 changes: 14 additions & 1 deletion internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,14 +483,27 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
httpHeader[QueryBytesFetchedHeaderName] = []string{strconv.FormatInt(res.(*PrometheusResponse).Data.SeriesStatsCounter.Bytes, 10)}
}
resp := http.Response{
Header: httpHeader,
Header: mergeHeaders(a.Headers),
Body: io.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
ContentLength: int64(len(b)),
}
return &resp, nil
}

// PrometheusResponseHeader helps preserve the Header from the original Prometheus response, coming from the Tripperware.
func mergeHeaders(headers []*PrometheusResponseHeader) http.Header {
h := make(http.Header, len(headers)+1)
for _, header := range headers {
if strings.EqualFold("Content-Type", header.Name) {
continue
}
h[header.Name] = header.Values
}
h["Content-Type"] = []string{"application/json"}
return h
}

// UnmarshalJSON implements json.Unmarshaler and is used for unmarshalling
// a Prometheus range query response (matrix).
func (s *SampleStream) UnmarshalJSON(data []byte) error {
Expand Down
Loading

0 comments on commit 28357e9

Please sign in to comment.