Skip to content

Commit

Permalink
Use OutOfOrderTimeWindow
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Rabenhorst <[email protected]>
  • Loading branch information
rabenhorst committed May 17, 2024
1 parent e5fd5c7 commit 31c67aa
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 43 deletions.
6 changes: 2 additions & 4 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,6 @@ func main() {
agentOnlyFlag(a, "storage.agent.no-lockfile", "Do not create lockfile in data directory.").
Default("false").BoolVar(&cfg.agent.NoLockfile)

agentOnlyFlag(a, "storage.agent.reject-out-of-order-samples", "Reject out-of-order samples.").
Default("false").BoolVar(&cfg.agent.RejectOOOSamples)

a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)

Expand Down Expand Up @@ -1183,6 +1180,7 @@ func main() {
if agentMode {
// WAL storage.
opts := cfg.agent.ToAgentOptions()
opts.SetOutOfOrderTimeWindow(cfg.tsdb.OutOfOrderTimeWindow)
cancel := make(chan struct{})
g.Add(
func() error {
Expand Down Expand Up @@ -1712,6 +1710,7 @@ type agentOptions struct {
MinWALTime, MaxWALTime model.Duration
NoLockfile bool
RejectOOOSamples bool
OutOfOrderTimeWindow int64
}

func (opts agentOptions) ToAgentOptions() agent.Options {
Expand All @@ -1723,7 +1722,6 @@ func (opts agentOptions) ToAgentOptions() agent.Options {
MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)),
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
NoLockfile: opts.NoLockfile,
RejectOOOSamples: opts.RejectOOOSamples,
}
}

Expand Down
1 change: 0 additions & 1 deletion docs/command-line/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ The Prometheus monitoring server
| <code class="text-nowrap">--storage.agent.retention.min-time</code> | Minimum age samples may be before being considered for deletion when the WAL is truncated Use with agent mode only. | |
| <code class="text-nowrap">--storage.agent.retention.max-time</code> | Maximum age samples may be before being forcibly deleted when the WAL is truncated Use with agent mode only. | |
| <code class="text-nowrap">--storage.agent.no-lockfile</code> | Do not create lockfile in data directory. Use with agent mode only. | `false` |
| <code class="text-nowrap">--storage.agent.reject-out-of-order-samples</code> | Reject out-of-order samples. Use with agent mode only. | `false` |
| <code class="text-nowrap">--storage.remote.flush-deadline</code> | How long to wait flushing sample on shutdown or config reload. | `1m` |
| <code class="text-nowrap">--storage.remote.read-sample-limit</code> | Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types. Use with server mode only. | `5e7` |
| <code class="text-nowrap">--storage.remote.read-concurrent-limit</code> | Maximum number of concurrent remote read calls. 0 means no limit. Use with server mode only. | `10` |
Expand Down
41 changes: 29 additions & 12 deletions tsdb/agent/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,29 @@ type Options struct {
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool

// RejectOOOSamples enables rejecting out of order samples.
RejectOOOSamples bool
// OutOfOrderTimeWindow specifies how much out of order is allowed, if any.
OutOfOrderTimeWindow int64
}

func (o *Options) SetOutOfOrderTimeWindow(outOfOrderTimeWindow int64) {
if outOfOrderTimeWindow < 0 {
return
}
o.OutOfOrderTimeWindow = outOfOrderTimeWindow
}

// DefaultOptions used for the WAL storage. They are reasonable for setups using
// millisecond-precision timestamps.
func DefaultOptions() *Options {
return &Options{
WALSegmentSize: wlog.DefaultSegmentSize,
WALCompression: wlog.CompressionNone,
StripeSize: tsdb.DefaultStripeSize,
TruncateFrequency: DefaultTruncateFrequency,
MinWALTime: DefaultMinWALTime,
MaxWALTime: DefaultMaxWALTime,
NoLockfile: false,
RejectOOOSamples: false,
WALSegmentSize: wlog.DefaultSegmentSize,
WALCompression: wlog.CompressionNone,
StripeSize: tsdb.DefaultStripeSize,
TruncateFrequency: DefaultTruncateFrequency,
MinWALTime: DefaultMinWALTime,
MaxWALTime: DefaultMaxWALTime,
NoLockfile: false,
OutOfOrderTimeWindow: 0,
}
}

Expand Down Expand Up @@ -816,7 +823,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
series.Lock()
defer series.Unlock()

if a.opts.RejectOOOSamples && t < series.lastTs {
if t <= a.minTs(series.lastTs) {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}
Expand Down Expand Up @@ -944,7 +951,7 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
series.Lock()
defer series.Unlock()

if a.opts.RejectOOOSamples && t < series.lastTs {
if t <= a.minTs(series.lastTs) {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}
Expand Down Expand Up @@ -1117,3 +1124,13 @@ func (a *appender) logSeries() error {

return nil
}

// mintTs returns the minimum timestamp that a sample can have
// and is needed for preventing underflow.
func (a *appender) minTs(lastTs int64) int64 {
if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow {
return math.MinInt64
}

return lastTs - a.opts.OutOfOrderTimeWindow
}
79 changes: 53 additions & 26 deletions tsdb/agent/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package agent
import (
"context"
"fmt"
"math"
"path/filepath"
"strconv"
"testing"
Expand Down Expand Up @@ -761,7 +762,9 @@ func TestDBAllowOOOSamples(t *testing.T) {
)

reg := prometheus.NewRegistry()
s := createTestAgentDB(t, reg, DefaultOptions())
opts := DefaultOptions()
opts.SetOutOfOrderTimeWindow(math.MaxInt64)
s := createTestAgentDB(t, reg, opts)
app := s.Appender(context.TODO())

// Let's add some samples in the [offset, offset+numDatapoints) range.
Expand Down Expand Up @@ -880,34 +883,58 @@ func TestDBAllowOOOSamples(t *testing.T) {
}

func TestDBRejectOOOSamples(t *testing.T) {
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.RejectOOOSamples = true
s := createTestAgentDB(t, reg, opts)
app := s.Appender(context.TODO())
tc := []struct {
outOfOrderTimeWindow, firstTs, secondTs int64
reject bool
}{
{0, 100, 101, false},
{0, 100, 100, true},
{0, 100, 99, true},
{100, 100, 1, false},
{100, 100, 0, true},
}

for _, c := range tc {
t.Run(fmt.Sprintf("outOfOrderTimeWindow=%d, firstTs=%d, secondTs=%d, reject=%t", c.outOfOrderTimeWindow, c.firstTs, c.secondTs, c.reject), func(t *testing.T) {
reg := prometheus.NewRegistry()
opts := DefaultOptions()
opts.SetOutOfOrderTimeWindow(c.outOfOrderTimeWindow)
s := createTestAgentDB(t, reg, opts)
app := s.Appender(context.TODO())

var expectedErr error
if c.reject {
expectedErr = storage.ErrOutOfOrderSample
}

lbls := labelsForTest(t.Name()+"_histogram", 1)
lset := labels.New(lbls[0]...)
_, err := app.AppendHistogram(0, lset, 1, tsdbutil.GenerateTestHistograms(1)[0], nil)
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.AppendHistogram(0, lset, 0, tsdbutil.GenerateTestHistograms(1)[0], nil)
require.ErrorIs(t, err, storage.ErrOutOfOrderSample, "should reject OOO samples")
lbls := labelsForTest(t.Name()+"_histogram", 1)
lset := labels.New(lbls[0]...)
_, err := app.AppendHistogram(0, lset, c.firstTs, tsdbutil.GenerateTestHistograms(1)[0], nil)
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.AppendHistogram(0, lset, c.secondTs, tsdbutil.GenerateTestHistograms(1)[0], nil)
require.ErrorIs(t, err, expectedErr)

lbls = labelsForTest(t.Name()+"_histogram", 1)
lset = labels.New(lbls[0]...)
_, err = app.Append(0, lset, 1, 0)
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.Append(0, lset, 0, 0)
require.ErrorIs(t, err, storage.ErrOutOfOrderSample, "should reject OOO samples")
lbls = labelsForTest(t.Name(), 1)
lset = labels.New(lbls[0]...)
_, err = app.Append(0, lset, c.firstTs, 0)
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
_, err = app.Append(0, lset, c.secondTs, 0)
require.ErrorIs(t, err, expectedErr)

m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(1), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(1), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())
expectedAppendedSamples := float64(2)
if c.reject {
expectedAppendedSamples = 1
}
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, expectedAppendedSamples, m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, expectedAppendedSamples, m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())
})
}
}

func BenchmarkCreateSeries(b *testing.B) {
Expand Down

0 comments on commit 31c67aa

Please sign in to comment.