diff --git a/api/host_volumes.go b/api/host_volumes.go index dae11afc68..985695fa70 100644 --- a/api/host_volumes.go +++ b/api/host_volumes.go @@ -147,11 +147,11 @@ func (c *Client) HostVolumes() *HostVolumes { } type HostVolumeCreateRequest struct { - Volumes []*HostVolume + Volume *HostVolume } type HostVolumeRegisterRequest struct { - Volumes []*HostVolume + Volume *HostVolume } type HostVolumeListRequest struct { @@ -163,30 +163,30 @@ type HostVolumeDeleteRequest struct { VolumeIDs []string } -// Create forwards to client agents so host volumes can be created on those -// hosts, and registers the volumes with Nomad servers. -func (hv *HostVolumes) Create(req *HostVolumeCreateRequest, opts *WriteOptions) ([]*HostVolume, *WriteMeta, error) { +// Create forwards to client agents so a host volume can be created on those +// hosts, and registers the volume with Nomad servers. +func (hv *HostVolumes) Create(req *HostVolumeCreateRequest, opts *WriteOptions) (*HostVolume, *WriteMeta, error) { var out struct { - Volumes []*HostVolume + Volume *HostVolume } wm, err := hv.client.put("/v1/volume/host/create", req, &out, opts) if err != nil { return nil, wm, err } - return out.Volumes, wm, nil + return out.Volume, wm, nil } -// Register registers host volumes that were created out-of-band with the Nomad +// Register registers a host volume that was created out-of-band with the Nomad // servers. -func (hv *HostVolumes) Register(req *HostVolumeRegisterRequest, opts *WriteOptions) ([]*HostVolume, *WriteMeta, error) { +func (hv *HostVolumes) Register(req *HostVolumeRegisterRequest, opts *WriteOptions) (*HostVolume, *WriteMeta, error) { var out struct { - Volumes []*HostVolume + Volume *HostVolume } wm, err := hv.client.put("/v1/volume/host/register", req, &out, opts) if err != nil { return nil, wm, err } - return out.Volumes, wm, nil + return out.Volume, wm, nil } // Get queries for a single host volume, by ID diff --git a/command/agent/host_volume_endpoint_test.go b/command/agent/host_volume_endpoint_test.go index 8a939a8658..ddff7a33fb 100644 --- a/command/agent/host_volume_endpoint_test.go +++ b/command/agent/host_volume_endpoint_test.go @@ -24,8 +24,8 @@ func TestHostVolumeEndpoint_CRUD(t *testing.T) { vol.NodePool = "" vol.Constraints = nil reqBody := struct { - Volumes []*structs.HostVolume - }{Volumes: []*structs.HostVolume{vol}} + Volume *structs.HostVolume + }{Volume: vol} buf := encodeReq(reqBody) req, err := http.NewRequest(http.MethodPut, "/v1/volume/host/create", buf) must.NoError(t, err) @@ -37,12 +37,12 @@ func TestHostVolumeEndpoint_CRUD(t *testing.T) { must.NoError(t, err) must.NotNil(t, obj) resp := obj.(*structs.HostVolumeCreateResponse) - must.Len(t, 1, resp.Volumes) - must.Eq(t, vol.Name, resp.Volumes[0].Name) - must.Eq(t, s.client.NodeID(), resp.Volumes[0].NodeID) + must.NotNil(t, resp.Volume) + must.Eq(t, vol.Name, resp.Volume.Name) + must.Eq(t, s.client.NodeID(), resp.Volume.NodeID) must.NotEq(t, "", respW.Result().Header.Get("X-Nomad-Index")) - volID := resp.Volumes[0].ID + volID := resp.Volume.ID // Verify volume was created @@ -61,8 +61,8 @@ func TestHostVolumeEndpoint_CRUD(t *testing.T) { vol = respVol.Copy() vol.Parameters = map[string]string{"bar": "foo"} // swaps key and value reqBody = struct { - Volumes []*structs.HostVolume - }{Volumes: []*structs.HostVolume{vol}} + Volume *structs.HostVolume + }{Volume: vol} buf = encodeReq(reqBody) req, err = http.NewRequest(http.MethodPut, "/v1/volume/host/register", buf) must.NoError(t, err) @@ -70,8 +70,8 @@ func TestHostVolumeEndpoint_CRUD(t *testing.T) { must.NoError(t, err) must.NotNil(t, obj) regResp := obj.(*structs.HostVolumeRegisterResponse) - must.Len(t, 1, regResp.Volumes) - must.Eq(t, map[string]string{"bar": "foo"}, regResp.Volumes[0].Parameters) + must.NotNil(t, regResp.Volume) + must.Eq(t, map[string]string{"bar": "foo"}, regResp.Volume.Parameters) // Verify volume was updated diff --git a/command/volume_create_host.go b/command/volume_create_host.go index 8947244aba..62ccf1a418 100644 --- a/command/volume_create_host.go +++ b/command/volume_create_host.go @@ -28,9 +28,9 @@ func (c *VolumeCreateCommand) hostVolumeCreate( } req := &api.HostVolumeCreateRequest{ - Volumes: []*api.HostVolume{vol}, + Volume: vol, } - vols, _, err := client.HostVolumes().Create(req, nil) + vol, _, err = client.HostVolumes().Create(req, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error creating volume: %s", err)) return 1 @@ -39,19 +39,15 @@ func (c *VolumeCreateCommand) hostVolumeCreate( var volID string var lastIndex uint64 - // note: the command only ever returns 1 volume from the API - for _, vol := range vols { - if detach || vol.State == api.HostVolumeStateReady { - c.Ui.Output(fmt.Sprintf( - "Created host volume %s with ID %s", vol.Name, vol.ID)) - return 0 - } else { - c.Ui.Output(fmt.Sprintf( - "==> Created host volume %s with ID %s", vol.Name, vol.ID)) - volID = vol.ID - lastIndex = vol.ModifyIndex - break - } + if detach || vol.State == api.HostVolumeStateReady { + c.Ui.Output(fmt.Sprintf( + "Created host volume %s with ID %s", vol.Name, vol.ID)) + return 0 + } else { + c.Ui.Output(fmt.Sprintf( + "==> Created host volume %s with ID %s", vol.Name, vol.ID)) + volID = vol.ID + lastIndex = vol.ModifyIndex } err = c.monitorHostVolume(client, volID, lastIndex, verbose) diff --git a/command/volume_register_host.go b/command/volume_register_host.go index 705f2faaf2..4e3ce6ccdd 100644 --- a/command/volume_register_host.go +++ b/command/volume_register_host.go @@ -18,18 +18,15 @@ func (c *VolumeRegisterCommand) hostVolumeRegister(client *api.Client, ast *ast. } req := &api.HostVolumeRegisterRequest{ - Volumes: []*api.HostVolume{vol}, + Volume: vol, } - vols, _, err := client.HostVolumes().Register(req, nil) + vol, _, err = client.HostVolumes().Register(req, nil) if err != nil { c.Ui.Error(fmt.Sprintf("Error registering volume: %s", err)) return 1 } - for _, vol := range vols { - // note: the command only ever returns 1 volume from the API - c.Ui.Output(fmt.Sprintf( - "Registered host volume %s with ID %s", vol.Name, vol.ID)) - } + c.Ui.Output(fmt.Sprintf( + "Registered host volume %s with ID %s", vol.Name, vol.ID)) return 0 } diff --git a/nomad/fsm.go b/nomad/fsm.go index 16a52e0810..9ea3267457 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -2428,7 +2428,7 @@ func (n *nomadFSM) applyHostVolumeRegister(msgType structs.MessageType, buf []by panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpsertHostVolumes(index, req.Volumes); err != nil { + if err := n.state.UpsertHostVolume(index, req.Volume); err != nil { n.logger.Error("UpsertHostVolumes failed", "error", err) return err } diff --git a/nomad/host_volume_endpoint.go b/nomad/host_volume_endpoint.go index 0f3fa457a6..cd7b629890 100644 --- a/nomad/host_volume_endpoint.go +++ b/nomad/host_volume_endpoint.go @@ -216,64 +216,70 @@ func (v *HostVolume) Create(args *structs.HostVolumeCreateRequest, reply *struct return err } - if len(args.Volumes) == 0 { + if args.Volume == nil { return fmt.Errorf("missing volume definition") } - for _, vol := range args.Volumes { - if vol.Namespace == "" { - vol.Namespace = args.RequestNamespace() - } - if !allowVolume(aclObj, vol.Namespace) { - return structs.ErrPermissionDenied - } + vol := args.Volume + if vol.Namespace == "" { + vol.Namespace = args.RequestNamespace() + } + if !allowVolume(aclObj, vol.Namespace) { + return structs.ErrPermissionDenied } - // ensure we only try to create valid volumes or make valid updates to - // volumes - validVols, err := v.validateVolumeUpdates(args.Volumes) + // ensure we only try to create a valid volume or make valid updates to a + // volume + now := time.Now() + snap, err := v.srv.State().Snapshot() if err != nil { - return helper.FlattenMultierror(err) + return err } - // Attempt to create all the validated volumes and write only successfully - // created volumes to raft. And we'll report errors for any failed volumes + vol, err = v.validateVolumeUpdate(vol, snap, now) + if err != nil { + return err + } + + _, err = v.placeHostVolume(snap, vol) + if err != nil { + return fmt.Errorf("could not place volume %q: %w", vol.Name, err) + } + + warn, err := v.enforceEnterprisePolicy( + snap, vol, args.GetIdentity().GetACLToken(), args.PolicyOverride) + if warn != nil { + reply.Warnings = warn.Error() + } + if err != nil { + return err + } + + // Attempt to create the volume on the client. // // NOTE: creating the volume on the client via the plugin can't be made // atomic with the registration, and creating the volume provides values we // want to write on the Volume in raft anyways. - - // This can't reuse the validVols slice because we only want to write - // volumes we've successfully created or updated on the client to get - // updated in Raft. - raftArgs := &structs.HostVolumeRegisterRequest{ - Volumes: []*structs.HostVolume{}, - WriteRequest: args.WriteRequest, - } - - var mErr *multierror.Error - for _, vol := range validVols { - err = v.createVolume(vol) // mutates the vol - if err != nil { - mErr = multierror.Append(mErr, err) - } else { - raftArgs.Volumes = append(raftArgs.Volumes, vol) - } + err = v.createVolume(vol) + if err != nil { + return err } - // if we created or updated any volumes, apply them to raft. - var index uint64 - if len(raftArgs.Volumes) > 0 { - _, index, err = v.srv.raftApply(structs.HostVolumeRegisterRequestType, raftArgs) - if err != nil { - v.logger.Error("raft apply failed", "error", err, "method", "register") - mErr = multierror.Append(mErr, err) - } + // Write a newly created or modified volume to raft. We create a new request + // here because we've likely mutated the volume. + _, index, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType, + &structs.HostVolumeRegisterRequest{ + Volume: vol, + WriteRequest: args.WriteRequest, + }) + if err != nil { + v.logger.Error("raft apply failed", "error", err, "method", "register") + return err } - reply.Volumes = raftArgs.Volumes + reply.Volume = vol reply.Index = index - return helper.FlattenMultierror(mErr) + return nil } func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *structs.HostVolumeRegisterResponse) error { @@ -294,105 +300,97 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st return err } - if len(args.Volumes) == 0 { + if args.Volume == nil { return fmt.Errorf("missing volume definition") } - for _, vol := range args.Volumes { - if vol.Namespace == "" { - vol.Namespace = args.RequestNamespace() - } - if !allowVolume(aclObj, vol.Namespace) { - return structs.ErrPermissionDenied - } + vol := args.Volume + if vol.Namespace == "" { + vol.Namespace = args.RequestNamespace() + } + if !allowVolume(aclObj, vol.Namespace) { + return structs.ErrPermissionDenied } - // ensure we only try to create valid volumes or make valid updates to - // volumes - validVols, err := v.validateVolumeUpdates(args.Volumes) + snap, err := v.srv.State().Snapshot() if err != nil { - return helper.FlattenMultierror(err) + return err } - raftArgs := &structs.HostVolumeRegisterRequest{ - Volumes: validVols, - WriteRequest: args.WriteRequest, + now := time.Now() + vol, err = v.validateVolumeUpdate(vol, snap, now) + if err != nil { + return err } - var mErr *multierror.Error - var index uint64 - if len(raftArgs.Volumes) > 0 { - _, index, err = v.srv.raftApply(structs.HostVolumeRegisterRequestType, raftArgs) - if err != nil { - v.logger.Error("raft apply failed", "error", err, "method", "register") - mErr = multierror.Append(mErr, err) - } + warn, err := v.enforceEnterprisePolicy( + snap, vol, args.GetIdentity().GetACLToken(), args.PolicyOverride) + if warn != nil { + reply.Warnings = warn.Error() + } + if err != nil { + return err } - reply.Volumes = raftArgs.Volumes + // Write a newly created or modified volume to raft. We create a new request + // here because we've likely mutated the volume. + _, index, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType, + &structs.HostVolumeRegisterRequest{ + Volume: vol, + WriteRequest: args.WriteRequest, + }) + if err != nil { + v.logger.Error("raft apply failed", "error", err, "method", "register") + return err + } + + reply.Volume = vol reply.Index = index - return helper.FlattenMultierror(mErr) + return nil } -func (v *HostVolume) validateVolumeUpdates(requested []*structs.HostVolume) ([]*structs.HostVolume, error) { +func (v *HostVolume) validateVolumeUpdate( + vol *structs.HostVolume, + snap *state.StateSnapshot, + now time.Time) (*structs.HostVolume, error) { - now := time.Now() - var vols []*structs.HostVolume - - snap, err := v.srv.State().Snapshot() + // validate the volume spec + err := vol.Validate() if err != nil { - return nil, err + return nil, fmt.Errorf("volume validation failed: %v", err) } - var mErr *multierror.Error - for _, vol := range requested { - - // validate the volume spec - err := vol.Validate() + // validate any update we're making + var existing *structs.HostVolume + volID := vol.ID + if vol.ID != "" { + existing, err = snap.HostVolumeByID(nil, vol.Namespace, vol.ID, true) if err != nil { - mErr = multierror.Append(mErr, fmt.Errorf("volume validation failed: %v", err)) - continue + return nil, err // should never hit, bail out } + if existing == nil { + return nil, fmt.Errorf("cannot update volume %q: volume does not exist", vol.ID) - // validate any update we're making - var existing *structs.HostVolume - volID := vol.ID - if vol.ID != "" { - existing, err = snap.HostVolumeByID(nil, vol.Namespace, vol.ID, true) - if err != nil { - return nil, err // should never hit, bail out - } - if existing == nil { - mErr = multierror.Append(mErr, - fmt.Errorf("cannot update volume %q: volume does not exist", vol.ID)) - continue - } - err = vol.ValidateUpdate(existing) - if err != nil { - mErr = multierror.Append(mErr, - fmt.Errorf("validating volume %q update failed: %v", vol.ID, err)) - continue - } - } else { - // capture this for nicer error messages later - volID = vol.Name } - - // set zero values as needed, possibly from existing - vol.CanonicalizeForUpdate(existing, now) - - // make sure any nodes or pools actually exist - err = v.validateVolumeForState(vol, snap) + err = vol.ValidateUpdate(existing) if err != nil { - mErr = multierror.Append(mErr, - fmt.Errorf("validating volume %q against state failed: %v", volID, err)) - continue + return nil, fmt.Errorf("validating volume %q update failed: %v", vol.ID, err) } + } else { + // capture this for nicer error messages later + volID = vol.Name + } - vols = append(vols, vol) + // set zero values as needed, possibly from existing + vol.CanonicalizeForUpdate(existing, now) + + // make sure any nodes or pools actually exist + err = v.validateVolumeForState(vol, snap) + if err != nil { + return nil, fmt.Errorf("validating volume %q against state failed: %v", volID, err) } - return vols, mErr.ErrorOrNil() + return vol, nil } // validateVolumeForState ensures that any references to node IDs or node pools are valid @@ -427,13 +425,6 @@ func (v *HostVolume) validateVolumeForState(vol *structs.HostVolume, snap *state func (v *HostVolume) createVolume(vol *structs.HostVolume) error { - node, err := v.placeHostVolume(vol) - if err != nil { - return fmt.Errorf("could not place volume %q: %w", vol.Name, err) - } - vol.NodeID = node.ID - vol.NodePool = node.NodePool - method := "ClientHostVolume.Create" cReq := &cstructs.ClientHostVolumeCreateRequest{ ID: vol.ID, @@ -445,7 +436,7 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error { Parameters: vol.Parameters, } cResp := &cstructs.ClientHostVolumeCreateResponse{} - err = v.srv.RPC(method, cReq, cResp) + err := v.srv.RPC(method, cReq, cResp) if err != nil { return err } @@ -460,17 +451,29 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error { return nil } -// placeHostVolume finds a node that matches the node pool and constraints, -// which doesn't already have a volume by that name. It returns a non-nil Node -// or an error indicating placement failed. -func (v *HostVolume) placeHostVolume(vol *structs.HostVolume) (*structs.Node, error) { +// placeHostVolume adds a node to volumes that don't already have one. The node +// will match the node pool and constraints, which doesn't already have a volume +// by that name. It returns the node (for testing) and an error indicating +// placement failed. +func (v *HostVolume) placeHostVolume(snap *state.StateSnapshot, vol *structs.HostVolume) (*structs.Node, error) { + if vol.NodeID != "" { + node, err := snap.NodeByID(nil, vol.NodeID) + if err != nil { + return nil, err + } + if node == nil { + return nil, fmt.Errorf("no such node %s", vol.NodeID) + } + vol.NodePool = node.NodePool + return node, nil + } var iter memdb.ResultIterator var err error if vol.NodePool != "" { - iter, err = v.srv.State().NodesByNodePool(nil, vol.NodePool) + iter, err = snap.NodesByNodePool(nil, vol.NodePool) } else { - iter, err = v.srv.State().Nodes(nil) + iter, err = snap.Nodes(nil) } if err != nil { return nil, err @@ -508,7 +511,10 @@ func (v *HostVolume) placeHostVolume(vol *structs.HostVolume) (*structs.Node, er } } + vol.NodeID = candidate.ID + vol.NodePool = candidate.NodePool return candidate, nil + } return nil, fmt.Errorf("no node meets constraints") diff --git a/nomad/host_volume_endpoint_ce.go b/nomad/host_volume_endpoint_ce.go new file mode 100644 index 0000000000..756df5f429 --- /dev/null +++ b/nomad/host_volume_endpoint_ce.go @@ -0,0 +1,23 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build !ent +// +build !ent + +package nomad + +import ( + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// enforceEnterprisePolicy is the CE stub for Enterprise governance via +// Sentinel policy, quotas, and node pools +func (v *HostVolume) enforceEnterprisePolicy( + _ *state.StateSnapshot, + _ *structs.HostVolume, + _ *structs.ACLToken, + _ bool, +) (error, error) { + return nil, nil +} diff --git a/nomad/host_volume_endpoint_test.go b/nomad/host_volume_endpoint_test.go index 2a432d961c..81cd7c5547 100644 --- a/nomad/host_volume_endpoint_test.go +++ b/nomad/host_volume_endpoint_test.go @@ -68,7 +68,6 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { codec := rpcClient(t, srv) req := &structs.HostVolumeCreateRequest{ - Volumes: []*structs.HostVolume{}, WriteRequest: structs.WriteRequest{ Region: srv.Region(), AuthToken: token}, @@ -81,39 +80,37 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) must.EqError(t, err, "missing volume definition") - req.Volumes = []*structs.HostVolume{ - {}, // missing basic fields - { - Name: "example", - PluginID: "example_plugin", - Constraints: []*structs.Constraint{{ - RTarget: "r1", - Operand: "=", - }}, - RequestedCapacityMinBytes: 200000, - RequestedCapacityMaxBytes: 100000, - RequestedCapabilities: []*structs.HostVolumeCapability{ - { - AttachmentMode: structs.HostVolumeAttachmentModeFilesystem, - AccessMode: structs.HostVolumeAccessModeSingleNodeWriter, - }, - { - AttachmentMode: "bad", - AccessMode: "invalid", - }, - }, - }, // fails other field validations - } + req.Volume = &structs.HostVolume{} err = msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) - // TODO(1.10.0): nested multierrors are really ugly, we could really use - // some helper functions to make these nicer everywhere they pop up - must.EqError(t, err, `2 errors occurred: - * volume validation failed: 2 errors occurred: + must.EqError(t, err, `volume validation failed: 2 errors occurred: * missing name * must include at least one capability block +`) + + req.Volume = &structs.HostVolume{ + Name: "example", + PluginID: "example_plugin", + Constraints: []*structs.Constraint{{ + RTarget: "r1", + Operand: "=", + }}, + RequestedCapacityMinBytes: 200000, + RequestedCapacityMaxBytes: 100000, + RequestedCapabilities: []*structs.HostVolumeCapability{ + { + AttachmentMode: structs.HostVolumeAttachmentModeFilesystem, + AccessMode: structs.HostVolumeAccessModeSingleNodeWriter, + }, + { + AttachmentMode: "bad", + AccessMode: "invalid", + }, + }, + } - * volume validation failed: 3 errors occurred: + err = msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) + must.EqError(t, err, `volume validation failed: 3 errors occurred: * capacity_max (100000) must be larger than capacity_min (200000) * invalid attachment mode: "bad" * invalid constraint: 1 error occurred: @@ -121,20 +118,17 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { - - `) invalidNode := &structs.Node{ID: uuid.Generate(), NodePool: "does-not-exist"} volOnInvalidNode := mock.HostVolumeRequestForNode(ns, invalidNode) - req.Volumes = []*structs.HostVolume{volOnInvalidNode} + req.Volume = volOnInvalidNode err = msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) must.EqError(t, err, fmt.Sprintf( `validating volume "example" against state failed: node %q does not exist`, invalidNode.ID)) }) - var vol1ID, vol2ID string var expectIndex uint64 c1.setCreate(&cstructs.ClientHostVolumeCreateResponse{ @@ -148,46 +142,56 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { vol2 := mock.HostVolumeRequest("apps") vol2.Name = "example2" vol2.NodePool = "prod" - req.Volumes = []*structs.HostVolume{vol1, vol2} t.Run("invalid permissions", func(t *testing.T) { var resp structs.HostVolumeCreateResponse req.AuthToken = otherToken + + req.Volume = vol1 err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) must.EqError(t, err, "Permission denied") }) t.Run("invalid node constraints", func(t *testing.T) { - req.Volumes[0].Constraints[0].RTarget = "r2" - req.Volumes[1].Constraints[0].RTarget = "r2" + vol1.Constraints[0].RTarget = "r2" + vol2.Constraints[0].RTarget = "r2" defer func() { - req.Volumes[0].Constraints[0].RTarget = "r1" - req.Volumes[1].Constraints[0].RTarget = "r1" + vol1.Constraints[0].RTarget = "r1" + vol2.Constraints[0].RTarget = "r1" }() + req.Volume = vol1.Copy() var resp structs.HostVolumeCreateResponse req.AuthToken = token err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) - must.EqError(t, err, `2 errors occurred: - * could not place volume "example1": no node meets constraints - * could not place volume "example2": no node meets constraints + must.EqError(t, err, `could not place volume "example1": no node meets constraints`) -`) + req.Volume = vol2.Copy() + resp = structs.HostVolumeCreateResponse{} + err = msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) + must.EqError(t, err, `could not place volume "example2": no node meets constraints`) }) t.Run("valid create", func(t *testing.T) { var resp structs.HostVolumeCreateResponse req.AuthToken = token + req.Volume = vol1.Copy() err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) must.NoError(t, err) - must.Len(t, 2, resp.Volumes) - vol1ID = resp.Volumes[0].ID - vol2ID = resp.Volumes[1].ID + must.NotNil(t, resp.Volume) + vol1 = resp.Volume + expectIndex = resp.Index + req.Volume = vol2.Copy() + resp = structs.HostVolumeCreateResponse{} + err = msgpackrpc.CallWithCodec(codec, "HostVolume.Create", req, &resp) + must.NoError(t, err) + must.NotNil(t, resp.Volume) + vol2 = resp.Volume getReq := &structs.HostVolumeGetRequest{ - ID: vol1ID, + ID: vol1.ID, QueryOptions: structs.QueryOptions{ Region: srv.Region(), Namespace: ns, @@ -206,14 +210,11 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { t.Run("invalid updates", func(t *testing.T) { - vol1, err := store.HostVolumeByID(nil, ns, vol1ID, false) - must.NoError(t, err) - must.NotNil(t, vol1) invalidVol1 := vol1.Copy() invalidVol2 := &structs.HostVolume{} createReq := &structs.HostVolumeCreateRequest{ - Volumes: []*structs.HostVolume{invalidVol1, invalidVol2}, + Volume: invalidVol2, WriteRequest: structs.WriteRequest{ Region: srv.Region(), Namespace: ns, @@ -221,18 +222,18 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { } c1.setCreate(nil, errors.New("should not call this endpoint on invalid RPCs")) var createResp structs.HostVolumeCreateResponse - err = msgpackrpc.CallWithCodec(codec, "HostVolume.Create", createReq, &createResp) + err := msgpackrpc.CallWithCodec(codec, "HostVolume.Create", createReq, &createResp) must.EqError(t, err, `volume validation failed: 2 errors occurred: * missing name * must include at least one capability block -`, must.Sprint("initial validation failures should exit early even if there's another valid vol")) +`, must.Sprint("initial validation failures should exit early")) invalidVol1.NodeID = uuid.Generate() invalidVol1.RequestedCapacityMinBytes = 100 invalidVol1.RequestedCapacityMaxBytes = 200 registerReq := &structs.HostVolumeRegisterRequest{ - Volumes: []*structs.HostVolume{invalidVol1}, + Volume: invalidVol1, WriteRequest: structs.WriteRequest{ Region: srv.Region(), Namespace: ns, @@ -249,13 +250,10 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { }) t.Run("blocking Get unblocks on write", func(t *testing.T) { - vol1, err := store.HostVolumeByID(nil, ns, vol1ID, false) - must.NoError(t, err) - must.NotNil(t, vol1) nextVol1 := vol1.Copy() nextVol1.RequestedCapacityMaxBytes = 300000 registerReq := &structs.HostVolumeRegisterRequest{ - Volumes: []*structs.HostVolume{nextVol1}, + Volume: nextVol1, WriteRequest: structs.WriteRequest{ Region: srv.Region(), Namespace: ns, @@ -270,7 +268,7 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { errCh := make(chan error) getReq := &structs.HostVolumeGetRequest{ - ID: vol1ID, + ID: vol1.ID, QueryOptions: structs.QueryOptions{ Region: srv.Region(), Namespace: ns, @@ -294,7 +292,7 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { time.AfterFunc(200*time.Millisecond, func() { codec := rpcClient(t, srv) var registerResp structs.HostVolumeRegisterResponse - err = msgpackrpc.CallWithCodec(codec, "HostVolume.Register", registerReq, ®isterResp) + err := msgpackrpc.CallWithCodec(codec, "HostVolume.Register", registerReq, ®isterResp) must.NoError(t, err) }) @@ -309,9 +307,6 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { }) t.Run("delete blocked by allocation claims", func(t *testing.T) { - vol2, err := store.HostVolumeByID(nil, ns, vol2ID, false) - must.NoError(t, err) - must.NotNil(t, vol2) // claim one of the volumes with a pending allocation alloc := mock.MinAlloc() @@ -326,7 +321,7 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { index, []*structs.Allocation{alloc})) delReq := &structs.HostVolumeDeleteRequest{ - VolumeIDs: []string{vol1ID, vol2ID}, + VolumeIDs: []string{vol1.ID, vol2.ID}, WriteRequest: structs.WriteRequest{ Region: srv.Region(), Namespace: ns, @@ -334,16 +329,16 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { } var delResp structs.HostVolumeDeleteResponse - err = msgpackrpc.CallWithCodec(codec, "HostVolume.Delete", delReq, &delResp) + err := msgpackrpc.CallWithCodec(codec, "HostVolume.Delete", delReq, &delResp) must.EqError(t, err, "Permission denied") delReq.AuthToken = powerToken err = msgpackrpc.CallWithCodec(codec, "HostVolume.Delete", delReq, &delResp) - must.EqError(t, err, fmt.Sprintf("volume %s in use by allocations: [%s]", vol2ID, alloc.ID)) + must.EqError(t, err, fmt.Sprintf("volume %s in use by allocations: [%s]", vol2.ID, alloc.ID)) // volume not in use will be deleted even if we got an error getReq := &structs.HostVolumeGetRequest{ - ID: vol1ID, + ID: vol1.ID, QueryOptions: structs.QueryOptions{ Region: srv.Region(), Namespace: ns, @@ -366,11 +361,11 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { } err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", nArgs, &structs.GenericResponse{}) - delReq.VolumeIDs = []string{vol2ID} + delReq.VolumeIDs = []string{vol2.ID} err = msgpackrpc.CallWithCodec(codec, "HostVolume.Delete", delReq, &delResp) must.NoError(t, err) - getReq.ID = vol2ID + getReq.ID = vol2.ID err = msgpackrpc.CallWithCodec(codec, "HostVolume.Get", getReq, &getResp) must.NoError(t, err) must.Nil(t, getResp.Volume) @@ -378,6 +373,7 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) { } func TestHostVolumeEndpoint_List(t *testing.T) { + ci.Parallel(t) srv, rootToken, cleanupSrv := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 @@ -422,47 +418,51 @@ func TestHostVolumeEndpoint_List(t *testing.T) { vol1 := mock.HostVolumeRequestForNode(ns1, nodes[0]) vol1.Name = "foobar-example" - vol1.Parameters = map[string]string{"mockID": "vol1"} vol2 := mock.HostVolumeRequestForNode(ns1, nodes[1]) vol2.Name = "foobaz-example" - vol2.Parameters = map[string]string{"mockID": "vol2"} vol3 := mock.HostVolumeRequestForNode(ns2, nodes[2]) vol3.Name = "foobar-example" - vol3.Parameters = map[string]string{"mockID": "vol3"} vol4 := mock.HostVolumeRequestForNode(ns2, nodes[1]) vol4.Name = "foobaz-example" - vol4.Parameters = map[string]string{"mockID": "vol4"} // we need to register these rather than upsert them so we have the correct // indexes for unblocking later. registerReq := &structs.HostVolumeRegisterRequest{ - Volumes: []*structs.HostVolume{vol1, vol2, vol3, vol4}, WriteRequest: structs.WriteRequest{ Region: srv.Region(), AuthToken: rootToken.SecretID}, } var registerResp structs.HostVolumeRegisterResponse + + // write the volumes in reverse order so our later test can get a blocking + // query index from a Get it has access to + + registerReq.Volume = vol4 err := msgpackrpc.CallWithCodec(codec, "HostVolume.Register", registerReq, ®isterResp) must.NoError(t, err) + vol4 = registerResp.Volume - // IDs are generated by the server, so we need to read them back to figure - // out which mock got which ID - for _, vol := range registerResp.Volumes { - switch vol.Parameters["mockID"] { - case "vol1": - vol1 = vol - case "vol2": - vol2 = vol - case "vol3": - vol3 = vol - case "vol4": - vol4 = vol - } - } + registerReq.Volume = vol3 + registerResp = structs.HostVolumeRegisterResponse{} + err = msgpackrpc.CallWithCodec(codec, "HostVolume.Register", registerReq, ®isterResp) + must.NoError(t, err) + vol3 = registerResp.Volume + + registerReq.Volume = vol2 + registerResp = structs.HostVolumeRegisterResponse{} + err = msgpackrpc.CallWithCodec(codec, "HostVolume.Register", registerReq, ®isterResp) + must.NoError(t, err) + vol2 = registerResp.Volume + + registerReq.Volume = vol1 + registerResp = structs.HostVolumeRegisterResponse{} + err = msgpackrpc.CallWithCodec(codec, "HostVolume.Register", registerReq, ®isterResp) + must.NoError(t, err) + vol1 = registerResp.Volume testCases := []struct { name string @@ -568,21 +568,24 @@ func TestHostVolumeEndpoint_List(t *testing.T) { t.Run("blocking query unblocks", func(t *testing.T) { - // Get response will include the volume's Index to block on + // the Get response from the most-recently written volume will have the + // index we want to block on getReq := &structs.HostVolumeGetRequest{ ID: vol1.ID, QueryOptions: structs.QueryOptions{ Region: srv.Region(), - Namespace: vol1.Namespace, + Namespace: ns1, AuthToken: token, }, } var getResp structs.HostVolumeGetResponse err = msgpackrpc.CallWithCodec(codec, "HostVolume.Get", getReq, &getResp) + must.NoError(t, err) + must.NotNil(t, getResp.Volume) nextVol := getResp.Volume.Copy() nextVol.RequestedCapacityMaxBytes = 300000 - registerReq.Volumes = []*structs.HostVolume{nextVol} + registerReq.Volume = nextVol registerReq.Namespace = nextVol.Namespace ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -716,7 +719,8 @@ func TestHostVolumeEndpoint_placeVolume(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - node, err := endpoint.placeHostVolume(tc.vol) + snap, _ := store.Snapshot() + node, err := endpoint.placeHostVolume(snap, tc.vol) if tc.expectErr == "" { must.NoError(t, err) must.Eq(t, tc.expect, node) @@ -788,6 +792,9 @@ func (v *mockHostVolumeClient) Create( resp *cstructs.ClientHostVolumeCreateResponse) error { v.lock.Lock() defer v.lock.Unlock() + if v.nextCreateResponse == nil { + return nil // prevents panics from incorrect tests + } *resp = *v.nextCreateResponse return v.nextCreateErr } diff --git a/nomad/search_endpoint_test.go b/nomad/search_endpoint_test.go index ae9e10e33f..5f9695f3d2 100644 --- a/nomad/search_endpoint_test.go +++ b/nomad/search_endpoint_test.go @@ -1058,14 +1058,14 @@ func TestSearch_PrefixSearch_HostVolume(t *testing.T) { id := uuid.Generate() index++ - err := store.UpsertHostVolumes(index, []*structs.HostVolume{{ + err := store.UpsertHostVolume(index, &structs.HostVolume{ ID: id, Name: "example", Namespace: structs.DefaultNamespace, PluginID: "glade", NodeID: node.ID, NodePool: node.NodePool, - }}) + }) must.NoError(t, err) req := &structs.SearchRequest{ @@ -1998,14 +1998,14 @@ func TestSearch_FuzzySearch_HostVolume(t *testing.T) { id := uuid.Generate() index++ - err := store.UpsertHostVolumes(index, []*structs.HostVolume{{ + err := store.UpsertHostVolume(index, &structs.HostVolume{ ID: id, Name: "example", Namespace: structs.DefaultNamespace, PluginID: "glade", NodeID: node.ID, NodePool: node.NodePool, - }}) + }) must.NoError(t, err) req := &structs.FuzzySearchRequest{ diff --git a/nomad/state/state_store_host_volumes.go b/nomad/state/state_store_host_volumes.go index 522d1d1946..37d1cccd1a 100644 --- a/nomad/state/state_store_host_volumes.go +++ b/nomad/state/state_store_host_volumes.go @@ -51,55 +51,53 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs return vol, nil } -// UpsertHostVolumes upserts a set of host volumes -func (s *StateStore) UpsertHostVolumes(index uint64, volumes []*structs.HostVolume) error { +// UpsertHostVolume upserts a host volume +func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) error { txn := s.db.WriteTxn(index) defer txn.Abort() - for _, v := range volumes { - if exists, err := s.namespaceExists(txn, v.Namespace); err != nil { - return err - } else if !exists { - return fmt.Errorf("host volume %s is in nonexistent namespace %s", v.ID, v.Namespace) - } + if exists, err := s.namespaceExists(txn, vol.Namespace); err != nil { + return err + } else if !exists { + return fmt.Errorf("host volume %s is in nonexistent namespace %s", vol.ID, vol.Namespace) + } - obj, err := txn.First(TableHostVolumes, indexID, v.Namespace, v.ID) - if err != nil { - return err - } - if obj != nil { - old := obj.(*structs.HostVolume) - v.CreateIndex = old.CreateIndex - v.CreateTime = old.CreateTime - } else { - v.CreateIndex = index - } + obj, err := txn.First(TableHostVolumes, indexID, vol.Namespace, vol.ID) + if err != nil { + return err + } + if obj != nil { + old := obj.(*structs.HostVolume) + vol.CreateIndex = old.CreateIndex + vol.CreateTime = old.CreateTime + } else { + vol.CreateIndex = index + } - // If the fingerprint is written from the node before the create RPC - // handler completes, we'll never update from the initial pending, so - // reconcile that here - node, err := s.NodeByID(nil, v.NodeID) - if err != nil { - return err - } - if node == nil { - return fmt.Errorf("host volume %s has nonexistent node ID %s", v.ID, v.NodeID) - } - if _, ok := node.HostVolumes[v.Name]; ok { - v.State = structs.HostVolumeStateReady - } - // Register RPCs for new volumes may not have the node pool set - v.NodePool = node.NodePool + // If the fingerprint is written from the node before the create RPC handler + // completes, we'll never update from the initial pending, so reconcile that + // here + node, err := s.NodeByID(nil, vol.NodeID) + if err != nil { + return err + } + if node == nil { + return fmt.Errorf("host volume %s has nonexistent node ID %s", vol.ID, vol.NodeID) + } + if _, ok := node.HostVolumes[vol.Name]; ok { + vol.State = structs.HostVolumeStateReady + } + // Register RPCs for new volumes may not have the node pool set + vol.NodePool = node.NodePool - // Allocations are denormalized on read, so we don't want these to be - // written to the state store. - v.Allocations = nil - v.ModifyIndex = index + // Allocations are denormalized on read, so we don't want these to be + // written to the state store. + vol.Allocations = nil + vol.ModifyIndex = index - err = txn.Insert(TableHostVolumes, v) - if err != nil { - return fmt.Errorf("host volume insert: %w", err) - } + err = txn.Insert(TableHostVolumes, vol) + if err != nil { + return fmt.Errorf("host volume insert: %w", err) } if err := txn.Insert(tableIndex, &IndexEntry{TableHostVolumes, index}); err != nil { diff --git a/nomad/state/state_store_host_volumes_test.go b/nomad/state/state_store_host_volumes_test.go index 358591160c..8726959784 100644 --- a/nomad/state/state_store_host_volumes_test.go +++ b/nomad/state/state_store_host_volumes_test.go @@ -54,7 +54,10 @@ func TestStateStore_HostVolumes_CRUD(t *testing.T) { vols[3].NodePool = nodes[2].NodePool index++ - must.NoError(t, store.UpsertHostVolumes(index, vols)) + must.NoError(t, store.UpsertHostVolume(index, vols[0])) + must.NoError(t, store.UpsertHostVolume(index, vols[1])) + must.NoError(t, store.UpsertHostVolume(index, vols[2])) + must.NoError(t, store.UpsertHostVolume(index, vols[3])) vol, err := store.HostVolumeByID(nil, vols[0].Namespace, vols[0].ID, true) must.NoError(t, err) @@ -108,13 +111,13 @@ func TestStateStore_HostVolumes_CRUD(t *testing.T) { must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, nodes[2])) // update all the volumes, which should update the state of vol2 as well + index++ for i, vol := range vols { vol = vol.Copy() vol.RequestedCapacityMaxBytes = 300000 vols[i] = vol + must.NoError(t, store.UpsertHostVolume(index, vol)) } - index++ - must.NoError(t, store.UpsertHostVolumes(index, vols)) iter, err = store.HostVolumesByName(nil, structs.DefaultNamespace, "example", SortDefault) must.NoError(t, err) @@ -221,7 +224,10 @@ func TestStateStore_UpdateHostVolumesFromFingerprint(t *testing.T) { index++ oldIndex := index - must.NoError(t, store.UpsertHostVolumes(index, vols)) + must.NoError(t, store.UpsertHostVolume(index, vols[0])) + must.NoError(t, store.UpsertHostVolume(index, vols[1])) + must.NoError(t, store.UpsertHostVolume(index, vols[2])) + must.NoError(t, store.UpsertHostVolume(index, vols[3])) vol0, err := store.HostVolumeByID(nil, ns, vols[0].ID, false) must.NoError(t, err) diff --git a/nomad/structs/host_volumes.go b/nomad/structs/host_volumes.go index 11745526aa..2c8e6cf237 100644 --- a/nomad/structs/host_volumes.go +++ b/nomad/structs/host_volumes.go @@ -329,22 +329,38 @@ type HostVolumeStub struct { } type HostVolumeCreateRequest struct { - Volumes []*HostVolume + Volume *HostVolume + + // PolicyOverride is set when the user is attempting to override any + // Enterprise policy enforcement + PolicyOverride bool + WriteRequest } type HostVolumeCreateResponse struct { - Volumes []*HostVolume + Volume *HostVolume + + // Warnings are non-fatal messages from Enterprise policy enforcement + Warnings string WriteMeta } type HostVolumeRegisterRequest struct { - Volumes []*HostVolume + Volume *HostVolume + + // PolicyOverride is set when the user is attempting to override any + // Enterprise policy enforcement + PolicyOverride bool + WriteRequest } type HostVolumeRegisterResponse struct { - Volumes []*HostVolume + Volume *HostVolume + + // Warnings are non-fatal messages from Enterprise policy enforcement + Warnings string WriteMeta }