Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: fix a bug where force GC wasn't respected #24456

Merged
merged 6 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24456.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fix bug where forced garbage collection does not ignore GC thresholds
```
123 changes: 60 additions & 63 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -28,51 +29,50 @@ type CoreScheduler struct {
snap *state.StateSnapshot
logger log.Logger

// custom GC Threshold values can be used by unit tests to simulate time
// manipulation
customJobGCThreshold time.Duration
customEvalGCThreshold time.Duration
customBatchEvalGCThreshold time.Duration
customNodeGCThreshold time.Duration
customDeploymentGCThreshold time.Duration
customCSIVolumeClaimGCThreshold time.Duration
customCSIPluginGCThreshold time.Duration
customACLTokenExpirationGCThreshold time.Duration
customRootKeyGCThreshold time.Duration
// customThresholdForObject is used by unit tests that want to manipulate GC
// threshold settings. Users can pass the string that matches the object to GC
// (e.g., structs.CoreJobEvalGC) and time.Duration that will be used as GC
// threshold value.
customThresholdForObject map[string]*time.Duration
}

// NewCoreScheduler is used to return a new system scheduler instance
func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Scheduler {
s := &CoreScheduler{
srv: srv,
snap: snap,
logger: srv.logger.ResetNamed("core.sched"),
srv: srv,
snap: snap,
logger: srv.logger.ResetNamed("core.sched"),
customThresholdForObject: make(map[string]*time.Duration),
}
return s
}

// Process is used to implement the scheduler.Scheduler interface
func (c *CoreScheduler) Process(eval *structs.Evaluation) error {
job := strings.Split(eval.JobID, ":") // extra data can be smuggled in w/ JobID

// check if there are any custom threshold values set
customThreshold := c.customThresholdForObject[job[0]]

switch job[0] {
case structs.CoreJobEvalGC:
return c.evalGC()
return c.evalGC(customThreshold)
case structs.CoreJobNodeGC:
return c.nodeGC(eval)
return c.nodeGC(eval, customThreshold)
case structs.CoreJobJobGC:
return c.jobGC(eval)
return c.jobGC(eval, customThreshold)
case structs.CoreJobDeploymentGC:
return c.deploymentGC()
return c.deploymentGC(customThreshold)
case structs.CoreJobCSIVolumeClaimGC:
return c.csiVolumeClaimGC(eval)
return c.csiVolumeClaimGC(eval, customThreshold)
case structs.CoreJobCSIPluginGC:
return c.csiPluginGC(eval)
return c.csiPluginGC(eval, customThreshold)
case structs.CoreJobOneTimeTokenGC:
return c.expiredOneTimeTokenGC(eval)
case structs.CoreJobLocalTokenExpiredGC:
return c.expiredACLTokenGC(eval, false)
return c.expiredACLTokenGC(eval, false, customThreshold)
case structs.CoreJobGlobalTokenExpiredGC:
return c.expiredACLTokenGC(eval, true)
return c.expiredACLTokenGC(eval, true, customThreshold)
case structs.CoreJobRootKeyRotateOrGC:
return c.rootKeyRotateOrGC(eval)
case structs.CoreJobVariablesRekey:
Expand All @@ -86,40 +86,44 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error {

// forceGC is used to garbage collect all eligible objects.
func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error {
if err := c.jobGC(eval); err != nil {
// set a minimal threshold for all objects to make force GC possible
force := pointer.Of(time.Millisecond)

if err := c.jobGC(eval, force); err != nil {
return err
}
if err := c.evalGC(); err != nil {
if err := c.evalGC(force); err != nil {
return err
}
if err := c.deploymentGC(); err != nil {
if err := c.deploymentGC(force); err != nil {
return err
}
if err := c.csiPluginGC(eval); err != nil {
if err := c.csiPluginGC(eval, force); err != nil {
return err
}
if err := c.csiVolumeClaimGC(eval); err != nil {
if err := c.csiVolumeClaimGC(eval, force); err != nil {
return err
}
if err := c.expiredOneTimeTokenGC(eval); err != nil {
return err
}
if err := c.expiredACLTokenGC(eval, false); err != nil {
if err := c.expiredACLTokenGC(eval, false, force); err != nil {
return err
}
if err := c.expiredACLTokenGC(eval, true); err != nil {
if err := c.expiredACLTokenGC(eval, true, force); err != nil {
return err
}
if err := c.rootKeyGC(eval, time.Now()); err != nil {
return err
}

// Node GC must occur after the others to ensure the allocations are
// cleared.
return c.nodeGC(eval)
return c.nodeGC(eval, force)
}

// jobGC is used to garbage collect eligible jobs.
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
func (c *CoreScheduler) jobGC(eval *structs.Evaluation, customThreshold *time.Duration) error {
// Get all the jobs eligible for garbage collection.
ws := memdb.NewWatchSet()
iter, err := c.snap.JobsByGC(ws, true)
Expand All @@ -131,8 +135,8 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
threshold = c.srv.config.JobGCThreshold

// custom threshold override
if c.customJobGCThreshold != 0 {
threshold = c.customJobGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}

cutoffTime := c.getCutoffTime(threshold)
Expand Down Expand Up @@ -263,7 +267,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string,
}

// evalGC is used to garbage collect old evaluations
func (c *CoreScheduler) evalGC() error {
func (c *CoreScheduler) evalGC(customThreshold *time.Duration) error {
// Iterate over the evaluations
ws := memdb.NewWatchSet()
iter, err := c.snap.Evals(ws, false)
Expand All @@ -276,11 +280,9 @@ func (c *CoreScheduler) evalGC() error {
batchThreshold = c.srv.config.BatchEvalGCThreshold

// custom threshold override
if c.customEvalGCThreshold != 0 {
threshold = c.customEvalGCThreshold
}
if c.customBatchEvalGCThreshold != 0 {
batchThreshold = c.customBatchEvalGCThreshold
if customThreshold != nil {
threshold = *customThreshold
batchThreshold = *customThreshold
}

cutoffTime := c.getCutoffTime(threshold)
Expand Down Expand Up @@ -376,8 +378,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, cutoffTime time.Time, a
var gcAllocIDs []string
for _, alloc := range allocs {
if !allocGCEligible(alloc, job, time.Now().UTC(), cutoffTime) {
// Can't GC the evaluation since not all of the allocations are
// terminal
// Can't GC the evaluation since not all the allocations are terminal
gcEval = false
} else {
// The allocation is eligible to be GC'd
Expand Down Expand Up @@ -462,7 +463,7 @@ func (c *CoreScheduler) partitionEvalReap(evals, allocs []string, batchSize int)
}

// nodeGC is used to garbage collect old nodes
func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
func (c *CoreScheduler) nodeGC(eval *structs.Evaluation, customThreshold *time.Duration) error {
// Iterate over the evaluations
ws := memdb.NewWatchSet()
iter, err := c.snap.Nodes(ws)
Expand All @@ -474,8 +475,8 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
threshold = c.srv.config.NodeGCThreshold

// custom threshold override
if c.customNodeGCThreshold != 0 {
threshold = c.customNodeGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -566,7 +567,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
}

// deploymentGC is used to garbage collect old deployments
func (c *CoreScheduler) deploymentGC() error {
func (c *CoreScheduler) deploymentGC(customThreshold *time.Duration) error {
// Iterate over the deployments
ws := memdb.NewWatchSet()
iter, err := c.snap.Deployments(ws, state.SortDefault)
Expand All @@ -578,8 +579,8 @@ func (c *CoreScheduler) deploymentGC() error {
threshold = c.srv.config.DeploymentGCThreshold

// custom threshold override
if c.customDeploymentGCThreshold != 0 {
threshold = c.customDeploymentGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -739,7 +740,7 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime, cutoffTime
}

// csiVolumeClaimGC is used to garbage collect CSI volume claims
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation, customThreshold *time.Duration) error {

gcClaims := func(ns, volID string) error {
req := &structs.CSIVolumeClaimRequest{
Expand Down Expand Up @@ -778,8 +779,8 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
threshold = c.srv.config.CSIVolumeClaimGCThreshold

// custom threshold override
if c.customCSIVolumeClaimGCThreshold != 0 {
threshold = c.customCSIVolumeClaimGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -812,7 +813,7 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
}

// csiPluginGC is used to garbage collect unused plugins
func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error {
func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation, customThreshold *time.Duration) error {

ws := memdb.NewWatchSet()

Expand All @@ -825,8 +826,8 @@ func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error {
threshold = c.srv.config.CSIPluginGCThreshold

// custom threshold override
if c.customCSIPluginGCThreshold != 0 {
threshold = c.customCSIPluginGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -870,7 +871,7 @@ func (c *CoreScheduler) expiredOneTimeTokenGC(eval *structs.Evaluation) error {
// tokens. It can be used for both local and global tokens and includes
// behaviour to account for periodic and user actioned garbage collection
// invocations.
func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool) error {
func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool, customThreshold *time.Duration) error {

// If ACLs are not enabled, we do not need to continue and should exit
// early. This is not an error condition as callers can blindly call this
Expand All @@ -893,8 +894,8 @@ func (c *CoreScheduler) expiredACLTokenGC(eval *structs.Evaluation, global bool)
threshold = c.srv.config.ACLTokenExpirationGCThreshold

// custom threshold override
if c.customACLTokenExpirationGCThreshold != 0 {
threshold = c.customACLTokenExpirationGCThreshold
if customThreshold != nil {
threshold = *customThreshold
}
cutoffTime := c.getCutoffTime(threshold)

Expand Down Expand Up @@ -1003,13 +1004,9 @@ func (c *CoreScheduler) rootKeyGC(eval *structs.Evaluation, now time.Time) error
return err
}

var threshold time.Duration
threshold = c.srv.config.RootKeyGCThreshold

// custom threshold override
if c.customRootKeyGCThreshold != 0 {
threshold = c.customRootKeyGCThreshold
}
// we don't do custom overrides for root keys because they are never subject to
// force GC
threshold := c.srv.config.RootKeyGCThreshold

// the threshold is longer than we can support with the time table, and we
// never want to force-GC keys because that will orphan signed Workload
Expand Down
8 changes: 3 additions & 5 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {

// set a shorter GC threshold this time
gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
core.(*CoreScheduler).customBatchEvalGCThreshold = time.Minute
//core.(*CoreScheduler).customEvalGCThreshold = time.Minute
//core.(*CoreScheduler).customJobGCThreshold = time.Minute
core.(*CoreScheduler).customThresholdForObject[structs.CoreJobEvalGC] = pointer.Of(time.Minute)
must.NoError(t, core.Process(gc))

// We expect the following:
Expand Down Expand Up @@ -2513,7 +2511,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(t, c.csiVolumeClaimGC(gc))
require.NoError(t, c.csiVolumeClaimGC(gc, nil))

// the only remaining claim is for a deleted alloc with no path to
// the non-existent node, so volumewatcher will release the
Expand Down Expand Up @@ -2551,7 +2549,7 @@ func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) {
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
must.NoError(t, c.csiVolumeClaimGC(gc))
must.NoError(t, c.csiVolumeClaimGC(gc, nil))

vol, err := srv.State().CSIVolumeByID(nil, structs.DefaultNamespace, "csi-volume-nfs0")
must.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions nomad/system_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Stop = true
// submit time must be older than default job GC
job.SubmitTime = time.Now().Add(-6 * time.Hour).UnixNano()
// set submit time older than now but still newer than default GC threshold
job.SubmitTime = time.Now().Add(-10 * time.Millisecond).UnixNano()
must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job))

eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.JobID = job.ID
// modify time must be older than default eval GC
eval.ModifyTime = time.Now().Add(-5 * time.Hour).UnixNano()
// set modify time older than now but still newer than default GC threshold
eval.ModifyTime = time.Now().Add(-10 * time.Millisecond).UnixNano()
must.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval}))

// Make the GC request
Expand Down