Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add possibility to add fanout metadata #490

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
Expand Down Expand Up @@ -442,6 +441,7 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
Expand Down
2 changes: 1 addition & 1 deletion execution/aggregate/count_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewCountValues(pool *model.VectorPool, next model.VectorOperator, param str
by: by,
grouping: grouping,
}
op.OperatorTelemetry = model.NewTelemetry(op, opts)
op.OperatorTelemetry = model.NewTelemetry(context.Background(), op, opts)

return op
}
Expand Down
2 changes: 1 addition & 1 deletion execution/aggregate/hashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewHashAggregate(
stepsBatch: opts.StepsBatch,
}

a.OperatorTelemetry = model.NewTelemetry(a, opts)
a.OperatorTelemetry = model.NewTelemetry(context.Background(), a, opts)

return a, nil
}
Expand Down
2 changes: 1 addition & 1 deletion execution/aggregate/khashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewKHashAggregate(
params: make([]float64, opts.StepsBatch),
}

op.OperatorTelemetry = model.NewTelemetry(op, opts)
op.OperatorTelemetry = model.NewTelemetry(context.Background(), op, opts)

return op, nil
}
Expand Down
2 changes: 1 addition & 1 deletion execution/binary/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewScalar(
bothScalars: scalarSide == ScalarSideBoth,
}

oper.OperatorTelemetry = model.NewTelemetry(op, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), op, opts)

return oper, nil

Expand Down
2 changes: 1 addition & 1 deletion execution/binary/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewVectorOperator(
sigFunc: signatureFunc(matching.On, matching.MatchingLabels...),
}

oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)

return oper, nil
}
Expand Down
2 changes: 1 addition & 1 deletion execution/exchange/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewCoalesce(pool *model.VectorPool, opts *query.Options, batchSize int64, o
batchSize: batchSize,
}

oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)

return oper
}
Expand Down
2 changes: 1 addition & 1 deletion execution/exchange/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewConcurrent(next model.VectorOperator, bufferSize int, opts *query.Option
bufferSize: bufferSize,
}

oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)
return oper
}

Expand Down
2 changes: 1 addition & 1 deletion execution/exchange/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewDedupOperator(pool *model.VectorPool, next model.VectorOperator, opts *q
next: next,
pool: pool,
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)

return oper
}
Expand Down
2 changes: 1 addition & 1 deletion execution/exchange/duplicate_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewDuplicateLabelCheck(next model.VectorOperator, opts *query.Options) mode
oper := &duplicateLabelCheckOperator{
next: next,
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)

return oper
}
Expand Down
2 changes: 1 addition & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func newSubqueryFunction(ctx context.Context, e *logicalplan.FunctionCall, t *lo
}
}

return scan.NewSubqueryOperator(model.NewVectorPool(opts.StepsBatch), inner, scalarArg, &outerOpts, e, t)
return scan.NewSubqueryOperator(ctx, model.NewVectorPool(opts.StepsBatch), inner, scalarArg, &outerOpts, e, t)
}

func newInstantVectorFunction(ctx context.Context, e *logicalplan.FunctionCall, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
Expand Down
2 changes: 1 addition & 1 deletion execution/function/absent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newAbsentOperator(
pool: pool,
next: next,
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)

return oper
}
Expand Down
2 changes: 1 addition & 1 deletion execution/function/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newHistogramOperator(
vectorOp: vectorOp,
scalarPoints: make([]float64, opts.StepsBatch),
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)

return oper
}
Expand Down
4 changes: 2 additions & 2 deletions execution/function/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newNoArgsFunctionOperator(funcExpr *logicalplan.FunctionCall, stepsBatch in
call: call,
vectorPool: model.NewVectorPool(stepsBatch),
}
op.OperatorTelemetry = model.NewTelemetry(op, opts)
op.OperatorTelemetry = model.NewTelemetry(context.Background(), op, opts)

switch funcExpr.Func.Name {
case "pi", "time":
Expand Down Expand Up @@ -112,7 +112,7 @@ func newInstantVectorFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOp
vectorIndex: 0,
scalarPoints: scalarPoints,
}
f.OperatorTelemetry = model.NewTelemetry(f, opts)
f.OperatorTelemetry = model.NewTelemetry(context.Background(), f, opts)

for i := range funcExpr.Args {
if funcExpr.Args[i].ReturnType() == parser.ValueTypeVector {
Expand Down
2 changes: 1 addition & 1 deletion execution/function/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newRelabelOperator(
next: next,
funcExpr: funcExpr,
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use context from constructor and pass through from engine? This is a bit more work but feels cleaner, we do same with warnings fwiw - if we want to use different kind of metadata someday


return oper
}
Expand Down
2 changes: 1 addition & 1 deletion execution/function/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func newScalarOperator(pool *model.VectorPool, next model.VectorOperator, opts *
next: next,
}

oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)
return oper
}

Expand Down
2 changes: 1 addition & 1 deletion execution/function/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func newTimestampOperator(next model.VectorOperator, opts *query.Options) *times
oper := &timestampOperator{
next: next,
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)

return oper
}
Expand Down
61 changes: 56 additions & 5 deletions execution/model/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package model
import (
"context"
"fmt"
"sync"
"time"

"github.com/prometheus/prometheus/model/labels"
Expand All @@ -14,6 +15,44 @@ import (
"github.com/thanos-io/promql-engine/query"
)

type metadataKey string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have similar pattern in /execution/warning i think, should we move this to /execution/metadata so its similarly structured?


const key metadataKey = "promql-metadata"

func AddMetadataStorage(ctx context.Context) context.Context {
return context.WithValue(ctx, key, &FanoutMetadata{
storage: make(map[string]map[string]any),
})
}

func GetMetadataStorage(ctx context.Context) *FanoutMetadata {
v := ctx.Value(key)
if v == nil {
return nil
}
return v.(*FanoutMetadata)
}

type FanoutMetadata struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could just be "Metadata" as its just a map; applications outside could build on top of this map since "fanout" is somewhat a thanos concept that this engine doesnt know about; its more an implementation detail of Select. Thanos could use this to just add fan out metadata in its Select implementation but no need that this library knows what this key contains really.

m sync.Mutex

storage map[string]map[string]any
}

func (m *FanoutMetadata) SetMetadata(endpoint string, data map[string]any) {
m.m.Lock()
defer m.m.Unlock()

m.storage[endpoint] = data
}

func (m *FanoutMetadata) GetMetadata() map[string]map[string]any {
m.m.Lock()
defer m.m.Unlock()

return m.storage
}

type OperatorTelemetry interface {
fmt.Stringer

Expand All @@ -22,18 +61,20 @@ type OperatorTelemetry interface {
IncrementSamplesAtTimestamp(samples int, t int64)
Samples() *stats.QuerySamples
SubQuery() bool

FanoutMetadata() *FanoutMetadata
}

func NewTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry {
func NewTelemetry(ctx context.Context, operator fmt.Stringer, opts *query.Options) OperatorTelemetry {
if opts.EnableAnalysis {
return NewTrackedTelemetry(operator, opts, false)
return NewTrackedTelemetry(ctx, operator, opts, false)
}
return NewNoopTelemetry(operator)
}

func NewSubqueryTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry {
func NewSubqueryTelemetry(ctx context.Context, operator fmt.Stringer, opts *query.Options) OperatorTelemetry {
if opts.EnableAnalysis {
return NewTrackedTelemetry(operator, opts, true)
return NewTrackedTelemetry(ctx, operator, opts, true)
}
return NewNoopTelemetry(operator)
}
Expand All @@ -52,6 +93,10 @@ func (tm *NoopTelemetry) ExecutionTimeTaken() time.Duration {
return time.Duration(0)
}

func (tm *NoopTelemetry) FanoutMetadata() *FanoutMetadata {
return nil
}

func (tm *NoopTelemetry) IncrementSamplesAtTimestamp(_ int, _ int64) {}

func (tm *NoopTelemetry) Samples() *stats.QuerySamples { return nil }
Expand All @@ -63,15 +108,17 @@ type TrackedTelemetry struct {
ExecutionTime time.Duration
LoadedSamples *stats.QuerySamples
subquery bool
ctx context.Context
}

func NewTrackedTelemetry(operator fmt.Stringer, opts *query.Options, subquery bool) *TrackedTelemetry {
func NewTrackedTelemetry(ctx context.Context, operator fmt.Stringer, opts *query.Options, subquery bool) *TrackedTelemetry {
ss := stats.NewQuerySamples(opts.EnablePerStepStats)
ss.InitStepTracking(opts.Start.UnixMilli(), opts.End.UnixMilli(), stepTrackingInterval(opts.Step))
return &TrackedTelemetry{
Stringer: operator,
LoadedSamples: ss,
subquery: subquery,
ctx: ctx,
}
}

Expand All @@ -88,6 +135,10 @@ func (ti *TrackedTelemetry) ExecutionTimeTaken() time.Duration {
return ti.ExecutionTime
}

func (ti *TrackedTelemetry) FanoutMetadata() *FanoutMetadata {
return GetMetadataStorage(ti.ctx)
}

func (ti *TrackedTelemetry) IncrementSamplesAtTimestamp(samples int, t int64) {
ti.updatePeak(samples)
ti.LoadedSamples.IncrementSamplesAtTimestamp(t, int64(samples))
Expand Down
1 change: 1 addition & 0 deletions execution/noop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type operator struct {

func NewOperator(opts *query.Options) model.VectorOperator {
scanner := prometheus.NewVectorSelector(
context.Background(),
model.NewVectorPool(0),
noopSelector{},
opts,
Expand Down
4 changes: 2 additions & 2 deletions execution/remote/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart ti
query: query,
opts: opts,
queryRangeStart: queryRangeStart,
vectorSelector: promstorage.NewVectorSelector(pool, storage, opts, 0, 0, false, 0, 1),
vectorSelector: promstorage.NewVectorSelector(context.Background(), pool, storage, opts, 0, 0, false, 0, 1),
}

oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)

return oper
}
Expand Down
2 changes: 1 addition & 1 deletion execution/scan/literal_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val f
val: val,
}

oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = model.NewTelemetry(context.Background(), oper, opts)
return oper
}

Expand Down
4 changes: 2 additions & 2 deletions execution/scan/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type subqueryOperator struct {
params []float64
}

func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
func NewSubqueryOperator(ctx context.Context, pool *model.VectorPool, next, paramOp model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
call, err := ringbuffer.NewRangeVectorFunc(funcExpr.Func.Name)
if err != nil {
return nil, err
Expand All @@ -74,7 +74,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera
lastCollected: -1,
params: make([]float64, opts.StepsBatch),
}
o.OperatorTelemetry = model.NewSubqueryTelemetry(o, opts)
o.OperatorTelemetry = model.NewSubqueryTelemetry(ctx, o, opts)

return o, nil
}
Expand Down
2 changes: 1 addition & 1 deletion execution/step_invariant/step_invariant.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewStepInvariantOperator(
stepsBatch: opts.StepsBatch,
cacheResult: true,
}
u.OperatorTelemetry = model.NewTelemetry(u, opts)
u.OperatorTelemetry = model.NewTelemetry(context.Background(), u, opts)
if u.step == 0 {
u.step = 1
}
Expand Down
2 changes: 1 addition & 1 deletion execution/unary/unary.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewUnaryNegation(next model.VectorOperator, opts *query.Options) (model.Vec
u := &unaryNegation{
next: next,
}
u.OperatorTelemetry = model.NewTelemetry(u, opts)
u.OperatorTelemetry = model.NewTelemetry(context.Background(), u, opts)

return u, nil
}
Expand Down
3 changes: 2 additions & 1 deletion storage/prometheus/matrix_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var ErrNativeHistogramsNotSupported = errors.New("native histograms are not supp

// NewMatrixSelector creates operator which selects vector of series over time.
func NewMatrixSelector(
ctx context.Context,
pool *model.VectorPool,
selector SeriesSelector,
functionName string,
Expand Down Expand Up @@ -109,7 +110,7 @@ func NewMatrixSelector(

extLookbackDelta: opts.ExtLookbackDelta.Milliseconds(),
}
m.OperatorTelemetry = model.NewTelemetry(m, opts)
m.OperatorTelemetry = model.NewTelemetry(ctx, m, opts)

// For instant queries, set the step to a positive value
// so that the operator can terminate.
Expand Down
8 changes: 0 additions & 8 deletions storage/prometheus/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ func NewSelectorPool(querier storage.Querier) *SelectorPool {
}
}

func (p *SelectorPool) GetSelector(mint, maxt, step int64, matchers []*labels.Matcher, hints storage.SelectHints) SeriesSelector {
key := hashMatchers(matchers, mint, maxt, hints)
if _, ok := p.selectors[key]; !ok {
p.selectors[key] = newSeriesSelector(p.querier, matchers, hints)
}
return p.selectors[key]
}

func (p *SelectorPool) GetFilteredSelector(mint, maxt, step int64, matchers, filters []*labels.Matcher, hints storage.SelectHints) SeriesSelector {
key := hashMatchers(matchers, mint, maxt, hints)
if _, ok := p.selectors[key]; !ok {
Expand Down
Loading
Loading