diff --git a/command/agent/host_volume_endpoint_test.go b/command/agent/host_volume_endpoint_test.go index a328b85e42e..8a939a86582 100644 --- a/command/agent/host_volume_endpoint_test.go +++ b/command/agent/host_volume_endpoint_test.go @@ -21,6 +21,8 @@ func TestHostVolumeEndpoint_CRUD(t *testing.T) { // Create a volume on the test node vol := mock.HostVolumeRequest(structs.DefaultNamespace) + vol.NodePool = "" + vol.Constraints = nil reqBody := struct { Volumes []*structs.HostVolume }{Volumes: []*structs.HostVolume{vol}} diff --git a/command/volume_create_host_test.go b/command/volume_create_host_test.go index bd4fff6f46a..ce713b20d5d 100644 --- a/command/volume_create_host_test.go +++ b/command/volume_create_host_test.go @@ -11,13 +11,16 @@ import ( "github.com/hashicorp/hcl" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/command/agent" "github.com/mitchellh/cli" "github.com/shoenig/test/must" ) func TestHostVolumeCreateCommand_Run(t *testing.T) { ci.Parallel(t) - srv, client, url := testServer(t, true, nil) + srv, client, url := testServer(t, true, func(c *agent.Config) { + c.Client.Meta = map[string]string{"rack": "foo"} + }) t.Cleanup(srv.Shutdown) waitForNodes(t, client) @@ -38,11 +41,6 @@ node_pool = "default" capacity_min = "10GiB" capacity_max = "20G" -constraint { - attribute = "${attr.kernel.name}" - value = "linux" -} - constraint { attribute = "${meta.rack}" value = "foo" diff --git a/nomad/host_volume_endpoint.go b/nomad/host_volume_endpoint.go index 4b4d09deac3..0f3fa457a65 100644 --- a/nomad/host_volume_endpoint.go +++ b/nomad/host_volume_endpoint.go @@ -6,6 +6,7 @@ package nomad import ( "fmt" "net/http" + "regexp" "strings" "time" @@ -19,6 +20,7 @@ import ( "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler" ) // HostVolume is the server RPC endpoint for host volumes @@ -425,28 +427,12 @@ func (v *HostVolume) validateVolumeForState(vol *structs.HostVolume, snap *state func (v *HostVolume) createVolume(vol *structs.HostVolume) error { - // TODO(1.10.0): proper node selection based on constraints and node - // pool. Also, should we move this into the validator step? - if vol.NodeID == "" { - var iter memdb.ResultIterator - var err error - var raw any - if vol.NodePool != "" { - iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool) - } else { - iter, err = v.srv.State().Nodes(nil) - } - if err != nil { - return err - } - raw = iter.Next() - if raw == nil { - return fmt.Errorf("no node meets constraints for volume") - } - - node := raw.(*structs.Node) - vol.NodeID = node.ID + node, err := v.placeHostVolume(vol) + if err != nil { + return fmt.Errorf("could not place volume %q: %w", vol.Name, err) } + vol.NodeID = node.ID + vol.NodePool = node.NodePool method := "ClientHostVolume.Create" cReq := &cstructs.ClientHostVolumeCreateRequest{ @@ -459,7 +445,7 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error { Parameters: vol.Parameters, } cResp := &cstructs.ClientHostVolumeCreateResponse{} - err := v.srv.RPC(method, cReq, cResp) + err = v.srv.RPC(method, cReq, cResp) if err != nil { return err } @@ -474,6 +460,80 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error { return nil } +// placeHostVolume finds a node that matches the node pool and constraints, +// which doesn't already have a volume by that name. It returns a non-nil Node +// or an error indicating placement failed. +func (v *HostVolume) placeHostVolume(vol *structs.HostVolume) (*structs.Node, error) { + + var iter memdb.ResultIterator + var err error + if vol.NodePool != "" { + iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool) + } else { + iter, err = v.srv.State().Nodes(nil) + } + if err != nil { + return nil, err + } + + var checker *scheduler.ConstraintChecker + + if len(vol.Constraints) > 0 { + ctx := &placementContext{ + regexpCache: make(map[string]*regexp.Regexp), + versionCache: make(map[string]scheduler.VerConstraints), + semverCache: make(map[string]scheduler.VerConstraints), + } + checker = scheduler.NewConstraintChecker(ctx, vol.Constraints) + } + + for { + raw := iter.Next() + if raw == nil { + break + } + candidate := raw.(*structs.Node) + + // note: this is a race if multiple users create volumes of the same + // name concurrently, but we can't solve it on the server because we + // haven't yet written to state. The client will reject requests to + // create/register a volume with the same name with a different ID. + if _, hasVol := candidate.HostVolumes[vol.Name]; hasVol { + continue + } + + if checker != nil { + if ok := checker.Feasible(candidate); !ok { + continue + } + } + + return candidate, nil + } + + return nil, fmt.Errorf("no node meets constraints") +} + +// placementContext implements the scheduler.ConstraintContext interface, a +// minimal subset of the scheduler.Context interface that we need to create a +// feasibility checker for constraints +type placementContext struct { + regexpCache map[string]*regexp.Regexp + versionCache map[string]scheduler.VerConstraints + semverCache map[string]scheduler.VerConstraints +} + +func (ctx *placementContext) Metrics() *structs.AllocMetric { return &structs.AllocMetric{} } +func (ctx *placementContext) RegexpCache() map[string]*regexp.Regexp { return ctx.regexpCache } + +func (ctx *placementContext) VersionConstraintCache() map[string]scheduler.VerConstraints { + return ctx.versionCache +} + +func (ctx *placementContext) SemverConstraintCache() map[string]scheduler.VerConstraints { + return ctx.semverCache +} + func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *structs.HostVolumeDeleteResponse) error { authErr := v.srv.Authenticate(v.ctx, args) diff --git a/nomad/host_volume_endpoint_test.go b/nomad/host_volume_endpoint_test.go index 098cb0f26f6..2a432d961cf 100644 --- a/nomad/host_volume_endpoint_test.go +++ b/nomad/host_volume_endpoint_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" @@ -156,6 +157,25 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { must.EqError(t, err, "Permission denied") }) + t.Run("invalid node constraints", func(t *testing.T) { + req.Volumes[0].Constraints[0].RTarget = "r2" + req.Volumes[1].Constraints[0].RTarget = "r2" + + defer func() { + req.Volumes[0].Constraints[0].RTarget = "r1" + req.Volumes[1].Constraints[0].RTarget = "r1" + }() + + var resp structs.HostVolumeCreateResponse + req.AuthToken = token + err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) + must.EqError(t, err, `2 errors occurred: + * could not place volume "example1": no node meets constraints + * could not place volume "example2": no node meets constraints + +`) + }) + t.Run("valid create", func(t *testing.T) { var resp structs.HostVolumeCreateResponse req.AuthToken = token @@ -611,6 +631,103 @@ func TestHostVolumeEndpoint_List(t *testing.T) { }) } +func TestHostVolumeEndpoint_placeVolume(t *testing.T) { + srv, _, cleanupSrv := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + t.Cleanup(cleanupSrv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.fsm.State() + + endpoint := &HostVolume{ + srv: srv, + logger: testlog.HCLogger(t), + } + + node0, node1, node2, node3 := mock.Node(), mock.Node(), mock.Node(), mock.Node() + node0.NodePool = structs.NodePoolDefault + node1.NodePool = "dev" + node1.Meta["rack"] = "r2" + node2.NodePool = "prod" + node3.NodePool = "prod" + node3.Meta["rack"] = "r3" + node3.HostVolumes = map[string]*structs.ClientHostVolumeConfig{"example": { + Name: "example", + Path: "/srv", + }} + + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node0)) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node2)) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node3)) + + testCases := []struct { + name string + vol *structs.HostVolume + expect *structs.Node + expectErr string + }{ + { + name: "only one in node pool", + vol: &structs.HostVolume{NodePool: "default"}, + expect: node0, + }, + { + name: "only one that matches constraints", + vol: &structs.HostVolume{Constraints: []*structs.Constraint{ + { + LTarget: "${meta.rack}", + RTarget: "r2", + Operand: "=", + }, + }}, + expect: node1, + }, + { + name: "only one available in pool", + vol: &structs.HostVolume{NodePool: "prod", Name: "example"}, + expect: node2, + }, + { + name: "no match", + vol: &structs.HostVolume{Constraints: []*structs.Constraint{ + { + LTarget: "${meta.rack}", + RTarget: "r6", + Operand: "=", + }, + }}, + expectErr: "no node meets constraints", + }, + { + name: "match already has a volume with the same name", + vol: &structs.HostVolume{ + Name: "example", + Constraints: []*structs.Constraint{ + { + LTarget: "${meta.rack}", + RTarget: "r3", + Operand: "=", + }, + }}, + expectErr: "no node meets constraints", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + node, err := endpoint.placeHostVolume(tc.vol) + if tc.expectErr == "" { + must.NoError(t, err) + must.Eq(t, tc.expect, node) + } else { + must.EqError(t, err, tc.expectErr) + must.Nil(t, node) + } + }) + } +} + // mockHostVolumeClient models client RPCs that have side-effects on the // client host type mockHostVolumeClient struct { @@ -631,6 +748,7 @@ func newMockHostVolumeClient(t *testing.T, srv *Server, pool string) (*mockHostV c.Node.NodePool = pool // TODO(1.10.0): we'll want to have a version gate for this feature c.Node.Attributes["nomad.version"] = version.Version + c.Node.Meta["rack"] = "r1" }, srv.config.RPCAddr, map[string]any{"HostVolume": mockClientEndpoint}) t.Cleanup(cleanup) diff --git a/nomad/state/state_store_host_volumes.go b/nomad/state/state_store_host_volumes.go index 732399ffb46..27013b05d90 100644 --- a/nomad/state/state_store_host_volumes.go +++ b/nomad/state/state_store_host_volumes.go @@ -87,6 +87,8 @@ func (s *StateStore) UpsertHostVolumes(index uint64, volumes []*structs.HostVolu if _, ok := node.HostVolumes[v.Name]; ok { v.State = structs.HostVolumeStateReady } + // Register RPCs for new volumes may not have the node pool set + v.NodePool = node.NodePool // Allocations are denormalized on read, so we don't want these to be // written to the state store. diff --git a/nomad/structs/host_volumes.go b/nomad/structs/host_volumes.go index 66a178a3423..21e827c3a9d 100644 --- a/nomad/structs/host_volumes.go +++ b/nomad/structs/host_volumes.go @@ -159,6 +159,12 @@ func (hv *HostVolume) Validate() error { if err := constraint.Validate(); err != nil { mErr = multierror.Append(mErr, fmt.Errorf("invalid constraint: %v", err)) } + switch constraint.Operand { + case ConstraintDistinctHosts, ConstraintDistinctProperty: + mErr = multierror.Append(mErr, fmt.Errorf( + "invalid constraint %s: host volumes of the same name are always on distinct hosts", constraint.Operand)) + default: + } } return mErr.ErrorOrNil() diff --git a/scheduler/context.go b/scheduler/context.go index 887607cf3be..e48cefc3918 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -51,6 +51,13 @@ type Context interface { SendEvent(event interface{}) } +type ConstraintContext interface { + Metrics() *structs.AllocMetric + RegexpCache() map[string]*regexp.Regexp + VersionConstraintCache() map[string]VerConstraints + SemverConstraintCache() map[string]VerConstraints +} + // EvalCache is used to cache certain things during an evaluation type EvalCache struct { reCache map[string]*regexp.Regexp diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 9ff3878baac..e6e7c81d4a3 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -752,12 +752,12 @@ func (iter *DistinctPropertyIterator) Reset() { // given set of constraints. This is used to filter on job, task group, and task // constraints. type ConstraintChecker struct { - ctx Context + ctx ConstraintContext constraints []*structs.Constraint } // NewConstraintChecker creates a ConstraintChecker for a set of constraints -func NewConstraintChecker(ctx Context, constraints []*structs.Constraint) *ConstraintChecker { +func NewConstraintChecker(ctx ConstraintContext, constraints []*structs.Constraint) *ConstraintChecker { return &ConstraintChecker{ ctx: ctx, constraints: constraints, @@ -830,7 +830,7 @@ func resolveTarget(target string, node *structs.Node) (string, bool) { // checkConstraint checks if a constraint is satisfied. The lVal and rVal // interfaces may be nil. -func checkConstraint(ctx Context, operand string, lVal, rVal interface{}, lFound, rFound bool) bool { +func checkConstraint(ctx ConstraintContext, operand string, lVal, rVal interface{}, lFound, rFound bool) bool { // Check for constraints not handled by this checker. switch operand { case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty: @@ -852,14 +852,14 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}, lFound return !lFound case structs.ConstraintVersion: parser := newVersionConstraintParser(ctx) - return lFound && rFound && checkVersionMatch(ctx, parser, lVal, rVal) + return lFound && rFound && checkVersionMatch(parser, lVal, rVal) case structs.ConstraintSemver: parser := newSemverConstraintParser(ctx) - return lFound && rFound && checkVersionMatch(ctx, parser, lVal, rVal) + return lFound && rFound && checkVersionMatch(parser, lVal, rVal) case structs.ConstraintRegex: return lFound && rFound && checkRegexpMatch(ctx, lVal, rVal) case structs.ConstraintSetContains, structs.ConstraintSetContainsAll: - return lFound && rFound && checkSetContainsAll(ctx, lVal, rVal) + return lFound && rFound && checkSetContainsAll(lVal, rVal) case structs.ConstraintSetContainsAny: return lFound && rFound && checkSetContainsAny(lVal, rVal) default: @@ -943,7 +943,7 @@ func compareOrder[T cmp.Ordered](op string, left, right T) bool { // checkVersionMatch is used to compare a version on the // left hand side with a set of constraints on the right hand side -func checkVersionMatch(_ Context, parse verConstraintParser, lVal, rVal interface{}) bool { +func checkVersionMatch(parse verConstraintParser, lVal, rVal interface{}) bool { // Parse the version var versionStr string switch v := lVal.(type) { @@ -979,7 +979,7 @@ func checkVersionMatch(_ Context, parse verConstraintParser, lVal, rVal interfac // checkAttributeVersionMatch is used to compare a version on the // left hand side with a set of constraints on the right hand side -func checkAttributeVersionMatch(_ Context, parse verConstraintParser, lVal, rVal *psstructs.Attribute) bool { +func checkAttributeVersionMatch(parse verConstraintParser, lVal, rVal *psstructs.Attribute) bool { // Parse the version var versionStr string if s, ok := lVal.GetString(); ok { @@ -1014,7 +1014,7 @@ func checkAttributeVersionMatch(_ Context, parse verConstraintParser, lVal, rVal // checkRegexpMatch is used to compare a value on the // left hand side with a regexp on the right hand side -func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool { +func checkRegexpMatch(ctx ConstraintContext, lVal, rVal interface{}) bool { // Ensure left-hand is string lStr, ok := lVal.(string) if !ok { @@ -1047,7 +1047,7 @@ func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool { // checkSetContainsAll is used to see if the left hand side contains the // string on the right hand side -func checkSetContainsAll(_ Context, lVal, rVal interface{}) bool { +func checkSetContainsAll(lVal, rVal interface{}) bool { // Ensure left-hand is string lStr, ok := lVal.(string) if !ok { @@ -1424,7 +1424,7 @@ func resolveDeviceTarget(target string, d *structs.NodeDeviceResource) (*psstruc // checkAttributeConstraint checks if a constraint is satisfied. nil equality // comparisons are considered to be false. -func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs.Attribute, lFound, rFound bool) bool { +func checkAttributeConstraint(ctx ConstraintContext, operand string, lVal, rVal *psstructs.Attribute, lFound, rFound bool) bool { // Check for constraints not handled by this checker. switch operand { case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty: @@ -1484,7 +1484,7 @@ func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs } parser := newVersionConstraintParser(ctx) - return checkAttributeVersionMatch(ctx, parser, lVal, rVal) + return checkAttributeVersionMatch(parser, lVal, rVal) case structs.ConstraintSemver: if !(lFound && rFound) { @@ -1492,7 +1492,7 @@ func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs } parser := newSemverConstraintParser(ctx) - return checkAttributeVersionMatch(ctx, parser, lVal, rVal) + return checkAttributeVersionMatch(parser, lVal, rVal) case structs.ConstraintRegex: if !(lFound && rFound) { @@ -1516,7 +1516,7 @@ func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs return false } - return checkSetContainsAll(ctx, ls, rs) + return checkSetContainsAll(ls, rs) case structs.ConstraintSetContainsAny: if !(lFound && rFound) { return false @@ -1550,7 +1550,7 @@ type VerConstraints interface { // or semver). type verConstraintParser func(verConstraint string) VerConstraints -func newVersionConstraintParser(ctx Context) verConstraintParser { +func newVersionConstraintParser(ctx ConstraintContext) verConstraintParser { cache := ctx.VersionConstraintCache() return func(cstr string) VerConstraints { @@ -1568,7 +1568,7 @@ func newVersionConstraintParser(ctx Context) verConstraintParser { } } -func newSemverConstraintParser(ctx Context) verConstraintParser { +func newSemverConstraintParser(ctx ConstraintContext) verConstraintParser { cache := ctx.SemverConstraintCache() return func(cstr string) VerConstraints { diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index f552b70c9f3..4e887752989 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -1263,7 +1263,7 @@ func TestCheckVersionConstraint(t *testing.T) { for _, tc := range cases { _, ctx := testContext(t) p := newVersionConstraintParser(ctx) - if res := checkVersionMatch(ctx, p, tc.lVal, tc.rVal); res != tc.result { + if res := checkVersionMatch(p, tc.lVal, tc.rVal); res != tc.result { t.Fatalf("TC: %#v, Result: %v", tc, res) } } @@ -1345,7 +1345,7 @@ func TestCheckSemverConstraint(t *testing.T) { t.Run(tc.name, func(t *testing.T) { _, ctx := testContext(t) p := newSemverConstraintParser(ctx) - actual := checkVersionMatch(ctx, p, tc.lVal, tc.rVal) + actual := checkVersionMatch(p, tc.lVal, tc.rVal) must.Eq(t, tc.result, actual) }) }