Skip to content

Commit

Permalink
per tenant retention (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnyi authored Dec 17, 2024
2 parents 2fecf4d + 2cc4540 commit 95cb360
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 0 deletions.
12 changes: 12 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

retentionByTenant, err := compact.ParesRetentionPolicyByTenant(logger, *conf.retentionTenants)
if err != nil {
level.Error(logger).Log("msg", "failed to parse retention policy by tenant", "err", err)
return err
}

var cleanMtx sync.Mutex
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
cleanPartialMarked := func() error {
Expand Down Expand Up @@ -534,6 +540,10 @@ func runCompact(
return errors.Wrap(err, "sync before retention")
}

if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, metadata.TenantRetentionExpired)); err != nil {
return errors.Wrap(err, "retention by tenant failed")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
return errors.Wrap(err, "retention failed")
}
Expand Down Expand Up @@ -726,6 +736,7 @@ type compactConfig struct {
objStore extflag.PathOrContent
consistencyDelay time.Duration
retentionRaw, retentionFiveMin, retentionOneHr model.Duration
retentionTenants *[]string
wait bool
waitInterval time.Duration
disableDownsampling bool
Expand Down Expand Up @@ -781,6 +792,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("0d").SetValue(&cc.retentionFiveMin)
cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever").
Default("0d").SetValue(&cc.retentionOneHr)
cc.retentionTenants = cmd.Flag("retention.tenant", "How long to retain samples in bucket per tenant. Setting this to 0d will retain samples of this resolution forever").Strings()

// TODO(kakkoyun, pgough): https://github.com/thanos-io/thanos/issues/2266.
cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Expand Down
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ const (
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
// DownsampleVerticalCompactionNoCompactReason is a reason to not compact overlapping downsampled blocks as it does not make sense e.g. how to vertically compact the average.
DownsampleVerticalCompactionNoCompactReason = "downsample-vertical-compaction"
// TenantRetentionExpired is a reason to delete block as it's per tenant retention is expired.
TenantRetentionExpired = "tenant-retention-expired"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
Expand Down
84 changes: 84 additions & 0 deletions pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,27 @@ package compact
import (
"context"
"fmt"
"regexp"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)

const (
// tenantRetentionRegex is the regex pattern for parsing tenant retention.
// valid format is `<tenant>:(<yyyy-mm-dd>|<duration>d)` where <duration> > 0.
tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))$`
)

// ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime.
// A value of 0 disables the retention for its resolution.
func ApplyRetentionPolicyByResolution(
Expand Down Expand Up @@ -47,3 +55,79 @@ func ApplyRetentionPolicyByResolution(
level.Info(logger).Log("msg", "optional retention apply done")
return nil
}

type RetentionPolicy struct {
CutoffDate time.Time
RetentionDuration time.Duration
}

func (r RetentionPolicy) isExpired(blockMaxTime time.Time) bool {
if r.CutoffDate.IsZero() {
return time.Now().After(blockMaxTime.Add(r.RetentionDuration))
}
return r.CutoffDate.After(blockMaxTime)
}

func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) (map[string]RetentionPolicy, error) {
pattern := regexp.MustCompile(tenantRetentionRegex)
retentionByTenant := make(map[string]RetentionPolicy, len(retentionTenants))
for _, tenantRetention := range retentionTenants {
matches := pattern.FindStringSubmatch(tenantRetention)
invalidFormat := errors.Errorf("invalid retention format for tenant: %s, must be `<tenant>:(<yyyy-mm-dd>|<duration>d)`", tenantRetention)
if len(matches) != 5 {
return nil, errors.Wrapf(invalidFormat, "matched size %d", len(matches))
}
tenant := matches[1]
var policy RetentionPolicy
if _, ok := retentionByTenant[tenant]; ok {
return nil, errors.Errorf("duplicate retention policy for tenant: %s", tenant)
}
if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); matches[3] != "" {
if err != nil {
return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err)
}
policy.CutoffDate = cutoffDate
}
if duration, err := model.ParseDuration(matches[4]); matches[4] != "" {
if err != nil {
return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err)
} else if duration == 0 {
return nil, errors.Wrapf(invalidFormat, "duration must be greater than 0")
}
policy.RetentionDuration = time.Duration(duration)
}
level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", fmt.Sprintf("%v", policy))
retentionByTenant[tenant] = policy
}
return retentionByTenant, nil
}

// ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime.
func ApplyRetentionPolicyByTenant(
ctx context.Context,
logger log.Logger,
bkt objstore.Bucket,
metas map[ulid.ULID]*metadata.Meta,
retentionByTenant map[string]RetentionPolicy,
blocksMarkedForDeletion prometheus.Counter) error {
if len(retentionByTenant) == 0 {
level.Info(logger).Log("msg", "tenant retention is disabled due to no policy")
return nil
}
level.Info(logger).Log("msg", "start tenant retention")
for id, m := range metas {
policy, ok := retentionByTenant[m.Thanos.GetTenant()]
if !ok {
continue
}
maxTime := time.Unix(m.MaxTime/1000, 0)
if policy.isExpired(maxTime) {
level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String())
if err := block.MarkForDeletion(ctx, logger, bkt, id, fmt.Sprintf("block exceeding retention of %v", policy), blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, "delete block")
}
}
}
level.Info(logger).Log("msg", "tenant retention apply done")
return nil
}
Loading

0 comments on commit 95cb360

Please sign in to comment.