From ff4ab14063c114b6bed170fc98e20c76065878d6 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 19 Nov 2024 14:53:28 -0500 Subject: [PATCH] dynamic host volumes: node selection via constraints When making a request to create a dynamic host volumes, users can pass a node pool and constraints instead of a specific node ID. This changeset implements a node scheduling logic by instantiating a filter by node pool and constraint checker borrowed from the scheduler package. Because host volumes with the same name can't land on the same host, we don't need to support `distinct_hosts`/`distinct_property`; this would be challenging anyways without building out a much larger node iteration mechanism to keep track of usage across multiple hosts. Ref: https://github.com/hashicorp/nomad/pull/24479 --- nomad/host_volume_endpoint.go | 103 +++++++++++++++++++------ nomad/host_volume_endpoint_test.go | 118 +++++++++++++++++++++++++++++ nomad/structs/host_volumes.go | 6 ++ scheduler/context.go | 7 ++ scheduler/feasible.go | 32 ++++---- 5 files changed, 228 insertions(+), 38 deletions(-) diff --git a/nomad/host_volume_endpoint.go b/nomad/host_volume_endpoint.go index 3998dabc3f2..e14c55b246b 100644 --- a/nomad/host_volume_endpoint.go +++ b/nomad/host_volume_endpoint.go @@ -6,6 +6,7 @@ package nomad import ( "fmt" "net/http" + "regexp" "strings" "time" @@ -19,6 +20,7 @@ import ( "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler" ) // HostVolume is the server RPC endpoint for host volumes @@ -425,28 +427,11 @@ func (v *HostVolume) validateVolumeForState(vol *structs.HostVolume, snap *state func (v *HostVolume) createVolume(vol *structs.HostVolume) error { - // TODO(1.10.0): proper node selection based on constraints and node - // pool. Also, should we move this into the validator step? - if vol.NodeID == "" { - var iter memdb.ResultIterator - var err error - var raw any - if vol.NodePool != "" { - iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool) - } else { - iter, err = v.srv.State().Nodes(nil) - } - if err != nil { - return err - } - raw = iter.Next() - if raw == nil { - return fmt.Errorf("no node meets constraints for volume") - } - - node := raw.(*structs.Node) - vol.NodeID = node.ID + node, err := v.placeHostVolume(vol) + if err != nil { + return fmt.Errorf("could not place volume %q: %w", vol.Name, err) } + vol.NodeID = node.ID method := "ClientHostVolume.Create" cReq := &cstructs.ClientHostVolumeCreateRequest{ @@ -459,7 +444,7 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error { Parameters: vol.Parameters, } cResp := &cstructs.ClientHostVolumeCreateResponse{} - err := v.srv.RPC(method, cReq, cResp) + err = v.srv.RPC(method, cReq, cResp) if err != nil { return err } @@ -474,6 +459,80 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error { return nil } +// placeHostVolume finds a node that matches the node pool and constraints, +// which doesn't already have a volume by that name. It returns a non-nil Node +// or an error indicating placement failed. +func (v *HostVolume) placeHostVolume(vol *structs.HostVolume) (*structs.Node, error) { + + var iter memdb.ResultIterator + var err error + if vol.NodePool != "" { + iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool) + } else { + iter, err = v.srv.State().Nodes(nil) + } + if err != nil { + return nil, err + } + + var checker *scheduler.ConstraintChecker + + if len(vol.Constraints) > 0 { + ctx := &placementContext{ + regexpCache: make(map[string]*regexp.Regexp), + versionCache: make(map[string]scheduler.VerConstraints), + semverCache: make(map[string]scheduler.VerConstraints), + } + checker = scheduler.NewConstraintChecker(ctx, vol.Constraints) + } + + for { + raw := iter.Next() + if raw == nil { + break + } + candidate := raw.(*structs.Node) + + // note: this is a race if multiple users create volumes of the same + // name concurrently, but we can't solve it on the server because we + // haven't yet written to state. The client will reject requests to + // create/register a volume with the same name with a different ID. + if _, hasVol := candidate.HostVolumes[vol.Name]; hasVol { + continue + } + + if checker != nil { + if ok := checker.Feasible(candidate); !ok { + continue + } + } + + return candidate, nil + } + + return nil, fmt.Errorf("no node meets constraints") +} + +// placementContext implements the scheduler.ConstraintContext interface, a +// minimal subset of the scheduler.Context interface that we need to create a +// feasibility checker for constraints +type placementContext struct { + regexpCache map[string]*regexp.Regexp + versionCache map[string]scheduler.VerConstraints + semverCache map[string]scheduler.VerConstraints +} + +func (ctx *placementContext) Metrics() *structs.AllocMetric { return &structs.AllocMetric{} } +func (ctx *placementContext) RegexpCache() map[string]*regexp.Regexp { return ctx.regexpCache } + +func (ctx *placementContext) VersionConstraintCache() map[string]scheduler.VerConstraints { + return ctx.versionCache +} + +func (ctx *placementContext) SemverConstraintCache() map[string]scheduler.VerConstraints { + return ctx.semverCache +} + func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *structs.HostVolumeDeleteResponse) error { authErr := v.srv.Authenticate(v.ctx, args) diff --git a/nomad/host_volume_endpoint_test.go b/nomad/host_volume_endpoint_test.go index 098cb0f26f6..dd90f684ff5 100644 --- a/nomad/host_volume_endpoint_test.go +++ b/nomad/host_volume_endpoint_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" @@ -156,6 +157,25 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { must.EqError(t, err, "Permission denied") }) + t.Run("invalid node constraints", func(t *testing.T) { + req.Volumes[0].Constraints[0].RTarget = "r2" + req.Volumes[1].Constraints[0].RTarget = "r2" + + defer func() { + req.Volumes[0].Constraints[0].RTarget = "r1" + req.Volumes[1].Constraints[0].RTarget = "r1" + }() + + var resp structs.HostVolumeCreateResponse + req.AuthToken = token + err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) + must.EqError(t, err, `2 errors occurred: + * could not place volume "example1": no node meets constraints + * could not place volume "example2": no node meets constraints + +`) + }) + t.Run("valid create", func(t *testing.T) { var resp structs.HostVolumeCreateResponse req.AuthToken = token @@ -611,6 +631,103 @@ func TestHostVolumeEndpoint_List(t *testing.T) { }) } +func TestHostVolumeEndpoint_placeVolume(t *testing.T) { + srv, _, cleanupSrv := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + t.Cleanup(cleanupSrv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.fsm.State() + + endpoint := &HostVolume{ + srv: srv, + logger: testlog.HCLogger(t), + } + + node0, node1, node2, node3 := mock.Node(), mock.Node(), mock.Node(), mock.Node() + node0.NodePool = structs.NodePoolDefault + node1.NodePool = "dev" + node1.Meta["rack"] = "r2" + node2.NodePool = "prod" + node3.NodePool = "prod" + node3.Meta["rack"] = "r3" + node3.HostVolumes = map[string]*structs.ClientHostVolumeConfig{"example": { + Name: "example", + Path: "/srv", + }} + + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node0)) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node2)) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node3)) + + testCases := []struct { + name string + vol *structs.HostVolume + expect *structs.Node + expectErr string + }{ + { + name: "only one in node pool", + vol: &structs.HostVolume{NodePool: "default"}, + expect: node0, + }, + { + name: "only one that matches constraints", + vol: &structs.HostVolume{Constraints: []*structs.Constraint{ + { + LTarget: "${meta.rack}", + RTarget: "r2", + Operand: "=", + }, + }}, + expect: node1, + }, + { + name: "only one available in pool", + vol: &structs.HostVolume{NodePool: "prod", Name: "example"}, + expect: node2, + }, + { + name: "no match", + vol: &structs.HostVolume{Constraints: []*structs.Constraint{ + { + LTarget: "${meta.rack}", + RTarget: "r6", + Operand: "=", + }, + }}, + expectErr: "no node meets constraints", + }, + { + name: "match is not available for placement", + vol: &structs.HostVolume{ + Name: "example", + Constraints: []*structs.Constraint{ + { + LTarget: "${meta.rack}", + RTarget: "r3", + Operand: "=", + }, + }}, + expectErr: "no node meets constraints", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + node, err := endpoint.placeHostVolume(tc.vol) + if tc.expectErr == "" { + must.NoError(t, err) + must.Eq(t, tc.expect, node) + } else { + must.EqError(t, err, tc.expectErr) + must.Nil(t, node) + } + }) + } +} + // mockHostVolumeClient models client RPCs that have side-effects on the // client host type mockHostVolumeClient struct { @@ -631,6 +748,7 @@ func newMockHostVolumeClient(t *testing.T, srv *Server, pool string) (*mockHostV c.Node.NodePool = pool // TODO(1.10.0): we'll want to have a version gate for this feature c.Node.Attributes["nomad.version"] = version.Version + c.Node.Meta["rack"] = "r1" }, srv.config.RPCAddr, map[string]any{"HostVolume": mockClientEndpoint}) t.Cleanup(cleanup) diff --git a/nomad/structs/host_volumes.go b/nomad/structs/host_volumes.go index 66a178a3423..21e827c3a9d 100644 --- a/nomad/structs/host_volumes.go +++ b/nomad/structs/host_volumes.go @@ -159,6 +159,12 @@ func (hv *HostVolume) Validate() error { if err := constraint.Validate(); err != nil { mErr = multierror.Append(mErr, fmt.Errorf("invalid constraint: %v", err)) } + switch constraint.Operand { + case ConstraintDistinctHosts, ConstraintDistinctProperty: + mErr = multierror.Append(mErr, fmt.Errorf( + "invalid constraint %s: host volumes of the same name are always on distinct hosts", constraint.Operand)) + default: + } } return mErr.ErrorOrNil() diff --git a/scheduler/context.go b/scheduler/context.go index 887607cf3be..e48cefc3918 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -51,6 +51,13 @@ type Context interface { SendEvent(event interface{}) } +type ConstraintContext interface { + Metrics() *structs.AllocMetric + RegexpCache() map[string]*regexp.Regexp + VersionConstraintCache() map[string]VerConstraints + SemverConstraintCache() map[string]VerConstraints +} + // EvalCache is used to cache certain things during an evaluation type EvalCache struct { reCache map[string]*regexp.Regexp diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 9ff3878baac..e6e7c81d4a3 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -752,12 +752,12 @@ func (iter *DistinctPropertyIterator) Reset() { // given set of constraints. This is used to filter on job, task group, and task // constraints. type ConstraintChecker struct { - ctx Context + ctx ConstraintContext constraints []*structs.Constraint } // NewConstraintChecker creates a ConstraintChecker for a set of constraints -func NewConstraintChecker(ctx Context, constraints []*structs.Constraint) *ConstraintChecker { +func NewConstraintChecker(ctx ConstraintContext, constraints []*structs.Constraint) *ConstraintChecker { return &ConstraintChecker{ ctx: ctx, constraints: constraints, @@ -830,7 +830,7 @@ func resolveTarget(target string, node *structs.Node) (string, bool) { // checkConstraint checks if a constraint is satisfied. The lVal and rVal // interfaces may be nil. -func checkConstraint(ctx Context, operand string, lVal, rVal interface{}, lFound, rFound bool) bool { +func checkConstraint(ctx ConstraintContext, operand string, lVal, rVal interface{}, lFound, rFound bool) bool { // Check for constraints not handled by this checker. switch operand { case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty: @@ -852,14 +852,14 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}, lFound return !lFound case structs.ConstraintVersion: parser := newVersionConstraintParser(ctx) - return lFound && rFound && checkVersionMatch(ctx, parser, lVal, rVal) + return lFound && rFound && checkVersionMatch(parser, lVal, rVal) case structs.ConstraintSemver: parser := newSemverConstraintParser(ctx) - return lFound && rFound && checkVersionMatch(ctx, parser, lVal, rVal) + return lFound && rFound && checkVersionMatch(parser, lVal, rVal) case structs.ConstraintRegex: return lFound && rFound && checkRegexpMatch(ctx, lVal, rVal) case structs.ConstraintSetContains, structs.ConstraintSetContainsAll: - return lFound && rFound && checkSetContainsAll(ctx, lVal, rVal) + return lFound && rFound && checkSetContainsAll(lVal, rVal) case structs.ConstraintSetContainsAny: return lFound && rFound && checkSetContainsAny(lVal, rVal) default: @@ -943,7 +943,7 @@ func compareOrder[T cmp.Ordered](op string, left, right T) bool { // checkVersionMatch is used to compare a version on the // left hand side with a set of constraints on the right hand side -func checkVersionMatch(_ Context, parse verConstraintParser, lVal, rVal interface{}) bool { +func checkVersionMatch(parse verConstraintParser, lVal, rVal interface{}) bool { // Parse the version var versionStr string switch v := lVal.(type) { @@ -979,7 +979,7 @@ func checkVersionMatch(_ Context, parse verConstraintParser, lVal, rVal interfac // checkAttributeVersionMatch is used to compare a version on the // left hand side with a set of constraints on the right hand side -func checkAttributeVersionMatch(_ Context, parse verConstraintParser, lVal, rVal *psstructs.Attribute) bool { +func checkAttributeVersionMatch(parse verConstraintParser, lVal, rVal *psstructs.Attribute) bool { // Parse the version var versionStr string if s, ok := lVal.GetString(); ok { @@ -1014,7 +1014,7 @@ func checkAttributeVersionMatch(_ Context, parse verConstraintParser, lVal, rVal // checkRegexpMatch is used to compare a value on the // left hand side with a regexp on the right hand side -func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool { +func checkRegexpMatch(ctx ConstraintContext, lVal, rVal interface{}) bool { // Ensure left-hand is string lStr, ok := lVal.(string) if !ok { @@ -1047,7 +1047,7 @@ func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool { // checkSetContainsAll is used to see if the left hand side contains the // string on the right hand side -func checkSetContainsAll(_ Context, lVal, rVal interface{}) bool { +func checkSetContainsAll(lVal, rVal interface{}) bool { // Ensure left-hand is string lStr, ok := lVal.(string) if !ok { @@ -1424,7 +1424,7 @@ func resolveDeviceTarget(target string, d *structs.NodeDeviceResource) (*psstruc // checkAttributeConstraint checks if a constraint is satisfied. nil equality // comparisons are considered to be false. -func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs.Attribute, lFound, rFound bool) bool { +func checkAttributeConstraint(ctx ConstraintContext, operand string, lVal, rVal *psstructs.Attribute, lFound, rFound bool) bool { // Check for constraints not handled by this checker. switch operand { case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty: @@ -1484,7 +1484,7 @@ func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs } parser := newVersionConstraintParser(ctx) - return checkAttributeVersionMatch(ctx, parser, lVal, rVal) + return checkAttributeVersionMatch(parser, lVal, rVal) case structs.ConstraintSemver: if !(lFound && rFound) { @@ -1492,7 +1492,7 @@ func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs } parser := newSemverConstraintParser(ctx) - return checkAttributeVersionMatch(ctx, parser, lVal, rVal) + return checkAttributeVersionMatch(parser, lVal, rVal) case structs.ConstraintRegex: if !(lFound && rFound) { @@ -1516,7 +1516,7 @@ func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs return false } - return checkSetContainsAll(ctx, ls, rs) + return checkSetContainsAll(ls, rs) case structs.ConstraintSetContainsAny: if !(lFound && rFound) { return false @@ -1550,7 +1550,7 @@ type VerConstraints interface { // or semver). type verConstraintParser func(verConstraint string) VerConstraints -func newVersionConstraintParser(ctx Context) verConstraintParser { +func newVersionConstraintParser(ctx ConstraintContext) verConstraintParser { cache := ctx.VersionConstraintCache() return func(cstr string) VerConstraints { @@ -1568,7 +1568,7 @@ func newVersionConstraintParser(ctx Context) verConstraintParser { } } -func newSemverConstraintParser(ctx Context) verConstraintParser { +func newSemverConstraintParser(ctx ConstraintContext) verConstraintParser { cache := ctx.SemverConstraintCache() return func(cstr string) VerConstraints {